From 192e228a98f3700f48d7fd136f4dce2979ec7c90 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 11:50:27 +0000 Subject: Start adding some tests --- tests/appservice/test_scheduler.py | 106 +++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 tests/appservice/test_scheduler.py (limited to 'tests/appservice') diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py new file mode 100644 index 0000000000..b41d4358cf --- /dev/null +++ b/tests/appservice/test_scheduler.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.appservice.scheduler import ( + AppServiceScheduler, AppServiceTransaction, _EventGrouper, + _TransactionController, _Recoverer +) +from twisted.internet import defer +from ..utils import MockClock +from mock import Mock +from tests import unittest + +class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.as_api = Mock() + self.store = Mock() + self.service = Mock() + self.callback = Mock() + self.recoverer = _Recoverer( + clock=self.clock, + as_api=self.as_api, + store=self.store, + service=self.service, + callback=self.callback, + ) + + def test_recover_service_single_txn(self): + txns = self._mk_txns(1) + self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0])) + + self.recoverer.recover() + self.assertEquals(0, self.store.get_oldest_txn.call_count) + self.clock.advance_time(2000) + self.assertEquals(2, self.store.get_oldest_txn.call_count) + + def _mk_txns(self, num_txns): + return [ + Mock() for i in range(num_txns) + ] + + + +class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): + + def setUp(self): + self.grouper = _EventGrouper() + + def test_drain_single_event(self): + service = Mock() + event = Mock() + self.grouper.on_receive(service, event) + groups = self.grouper.drain_groups() + self.assertTrue(service in groups) + self.assertEquals([event], groups[service]) + self.assertEquals(1, len(groups.keys())) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) + + def test_drain_multiple_events(self): + service = Mock() + events = [Mock(), Mock(), Mock()] + for e in events: + self.grouper.on_receive(service, e) + groups = self.grouper.drain_groups() + self.assertTrue(service in groups) + self.assertEquals(events, groups[service]) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) + + def test_drain_multiple_services(self): + services = [Mock(), Mock(), Mock()] + events_a = [Mock(), Mock()] + events_b = [Mock()] + events_c = [Mock(), Mock(), Mock(), Mock()] + mappings = { + services[0]: events_a, + services[1]: events_b, + services[2]: events_c + } + for e in events_b: + self.grouper.on_receive(services[1], e) + for e in events_c: + self.grouper.on_receive(services[2], e) + for e in events_a: + self.grouper.on_receive(services[0], e) + + groups = self.grouper.drain_groups() + for service in services: + self.assertTrue(service in groups) + self.assertEquals(mappings[service], groups[service]) + self.assertEquals(3, len(groups.keys())) + # no more events + self.assertEquals(self.grouper.drain_groups(), {}) -- cgit 1.5.1 From 0fbfe1b08a791e95dc9e9d417f131e80b4ce8059 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 14:36:52 +0000 Subject: Add more tests; fix bugs. --- synapse/appservice/scheduler.py | 4 +-- tests/appservice/test_scheduler.py | 54 +++++++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 9 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 754f39381f..f54df9c9a5 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -174,7 +174,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(2000 ** self.backoff_counter, self.retry) + self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): @@ -184,7 +184,7 @@ class _Recoverer(object): txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 - self.retry() + yield self.retry() else: self.backoff_counter += 1 self.recover() diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index b41d4358cf..1e3eb9e1cc 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -21,6 +21,7 @@ from ..utils import MockClock from mock import Mock from tests import unittest + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -37,21 +38,60 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): callback=self.callback, ) - def test_recover_service_single_txn(self): - txns = self._mk_txns(1) - self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0])) + def test_recover_single_txn(self): + txn = Mock() + # return one txn to send, then no more old txns + txns = [txn, None] + + def take_txn(*args, **kwargs): + return defer.succeed(txns.pop(0)) + self.store.get_oldest_txn = Mock(side_effect=take_txn) self.recoverer.recover() + # shouldn't have called anything prior to waiting for exp backoff self.assertEquals(0, self.store.get_oldest_txn.call_count) + txn.send = Mock(return_value=True) + # wait for exp backoff self.clock.advance_time(2000) + self.assertEquals(1, txn.send.call_count) + self.assertEquals(1, txn.complete.call_count) + # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) + self.assertEquals(1, self.callback.call_count) - def _mk_txns(self, num_txns): - return [ - Mock() for i in range(num_txns) - ] + def test_recover_retry_txn(self): + txn = Mock() + txns = [txn, None] + pop_txn = False + def take_txn(*args, **kwargs): + if pop_txn: + return defer.succeed(txns.pop(0)) + else: + return defer.succeed(txn) + self.store.get_oldest_txn = Mock(side_effect=take_txn) + self.recoverer.recover() + self.assertEquals(0, self.store.get_oldest_txn.call_count) + txn.send = Mock(return_value=False) + self.clock.advance_time(2000) + self.assertEquals(1, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + self.clock.advance_time(4000) + self.assertEquals(2, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + self.clock.advance_time(8000) + self.assertEquals(3, txn.send.call_count) + self.assertEquals(0, txn.complete.call_count) + self.assertEquals(0, self.callback.call_count) + txn.send = Mock(return_value=True) # successfully send the txn + pop_txn = True # returns the txn the first time, then no more. + self.clock.advance_time(16000) + self.assertEquals(1, txn.send.call_count) # new mock reset call count + self.assertEquals(1, txn.complete.call_count) + self.assertEquals(1, self.callback.call_count) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From f260cb72cd3435d540411962a92ca2a9fd333eb1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:12:24 +0000 Subject: Flesh out more stub functions. --- synapse/appservice/__init__.py | 5 +++++ synapse/appservice/scheduler.py | 37 +++++++++++++++++++++++++++++-------- synapse/storage/appservice.py | 17 +++++++++++++++-- tests/appservice/test_scheduler.py | 5 +++-- 4 files changed, 52 insertions(+), 12 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..cc6c381566 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,11 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 645d7bf6b2..99e83747a8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -49,7 +49,11 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ +from synapse.appservice import ApplicationServiceState from twisted.internet import defer +import logging + +logger = logging.getLogger(__name__) class AppServiceScheduler(object): @@ -162,21 +166,36 @@ class _TransactionController(object): if txn.send(self.as_api): txn.complete(self.store) else: - # TODO mark AS as down self._start_recoverer(service) self.clock.call_later(1000, self.start_polling) - def on_recovered(self, service): - # TODO mark AS as UP - pass + @defer.inlineCallbacks + def on_recovered(self, recoverer): + applied_state = yield self.store.set_appservice_state( + recoverer.service, + ApplicationServiceState.UP + ) + if not applied_state: + logger.error("Failed to apply appservice state UP to service %s", + recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) + @defer.inlineCallbacks def _start_recoverer(self, service): - recoverer = self.recoverer_fn(service, self.on_recovered) - recoverer.recover() + applied_state = yield self.store.set_appservice_state( + service, + ApplicationServiceState.DOWN + ) + if applied_state: + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() + else: + logger.error("Failed to apply appservice state DOWN to service %s", + service) def _is_service_up(self, service): pass @@ -193,7 +212,9 @@ class _Recoverer(object): @staticmethod @defer.inlineCallbacks def start(clock, store, as_api, callback): - services = yield store.get_failing_appservices() + services = yield store.get_appservices_by_state( + ApplicationServiceState.DOWN + ) recoverers = [ _Recoverer(clock, store, as_api, s, callback) for s in services ] @@ -228,7 +249,7 @@ class _Recoverer(object): self._set_service_recovered() def _set_service_recovered(self): - self.callback(self.service) + self.callback(self) @defer.inlineCallbacks def _get_oldest_txn(self): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c1762692b9..214f6d99c5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -343,15 +343,28 @@ class ApplicationServiceTransactionStore(SQLBaseStore): def __init__(self, hs): super(ApplicationServiceTransactionStore, self).__init__(hs) - def get_failing_appservices(self): - """Get a list of application services which are down. + def get_appservices_by_state(self, state): + """Get a list of application services based on their state. + Args: + state(ApplicationServiceState): The state to filter on. Returns: A Deferred which resolves to a list of ApplicationServices, which may be empty. """ pass + def set_appservice_state(self, service, state): + """Set the application service state. + + Args: + service(ApplicationService): The service whose state to set. + state(ApplicationServiceState): The connectivity state to apply. + Returns: + A Deferred which resolves to True if the state was set successfully. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 1e3eb9e1cc..ec8f77c54b 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -57,7 +57,8 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns self.assertEquals(2, self.store.get_oldest_txn.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) + self.assertEquals(self.recoverer.service, self.service) def test_recover_retry_txn(self): txn = Mock() @@ -91,7 +92,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.clock.advance_time(16000) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) - self.assertEquals(1, self.callback.call_count) + self.callback.assert_called_once_with(self.recoverer) class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): -- cgit 1.5.1 From 0354659f9d8b60b9edc78b0b597bceb52b8c7b2b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:09:05 +0000 Subject: Finish synapse.appservice.scheduler implementation. With tests to assert behaviour. Not hooked up yet. Stub datastore methods not implemented yet. --- synapse/appservice/__init__.py | 39 +++++++++++++ synapse/appservice/scheduler.py | 63 ++++---------------- synapse/storage/appservice.py | 22 +++++++ tests/appservice/test_scheduler.py | 115 ++++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 53 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index cc6c381566..743a8278ad 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -25,6 +25,45 @@ class ApplicationServiceState(object): UP = "up" +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2b3aa3b0ea..50ad3b8e83 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -88,45 +88,6 @@ class AppServiceScheduler(object): self.event_grouper.on_receive(service, event) -class AppServiceTransaction(object): - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - def send(self, as_api): - """Sends this transaction using the provided AS API interface. - - Args: - as_api(ApplicationServiceApi): The API to use to send. - Returns: - A Deferred which resolves to True if the transaction was sent. - """ - return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id - ) - - def complete(self, store): - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - Returns: - A Deferred which resolves to True if the transaction was completed. - """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) - - class _EventGrouper(object): """Groups events for the same application service together. """ @@ -156,14 +117,18 @@ class _TransactionController(object): # keep track of how many recoverers there are self.recoverers = [] + @defer.inlineCallbacks def start_polling(self): groups = self.event_grouper.drain_groups() for service in groups: - txn_id = self._get_next_txn_id(service) - txn = AppServiceTransaction(service, txn_id, groups[service]) - self._store_txn(txn) - if self._is_service_up(service): - if txn.send(self.as_api): + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: txn.complete(self.store) else: self._start_recoverer(service) @@ -207,14 +172,10 @@ class _TransactionController(object): logger.error("Failed to apply appservice state DOWN to service %s", service) + @defer.inlineCallbacks def _is_service_up(self, service): - pass - - def _get_next_txn_id(self, service): - pass # TODO work out the next txn_id for this service - - def _store_txn(self, txn): - pass + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP) class _Recoverer(object): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 214f6d99c5..6fde7dcc66 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + pass + def set_appservice_state(self, service, state): """Set the application service state. @@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list): A list of events to put in the transaction. + Returns: + ApplicationServiceTransaction: A new transaction. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index ec8f77c54b..a31755da67 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, AppServiceTransaction, _EventGrouper, - _TransactionController, _Recoverer + AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -22,6 +22,116 @@ from mock import Mock from tests import unittest +class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.store = Mock() + self.as_api = Mock() + self.event_grouper = Mock() + self.recoverer = Mock() + self.recoverer_fn = Mock(return_value=self.recoverer) + self.txnctrl = _TransactionController( + clock=self.clock, store=self.store, as_api=self.as_api, + event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + ) + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is successfully sent. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + txn.send = Mock(return_value=defer.succeed(True)) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made + txn.complete.assert_called_once_with(self.store) # txn completed + + def test_poll_single_group_service_down(self): + # Test: The AS is down so it shouldn't push; Recoverers will do it. + # It should still make a transaction though. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + + self.event_grouper.drain_groups = Mock(return_value=groups) + txn = Mock(id="idhere", service=service, events=events) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.DOWN) + ) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, txn.send.call_count) # txn not sent though + self.assertEquals(0, txn.complete.call_count) # or completed + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is not sent. A Recoverer is made and + # started. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) + txn.send = Mock(return_value=defer.succeed(False)) # fails to send + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events + ) + self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made + self.assertEquals(1, self.recoverer.recover.call_count) # and invoked + self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored + self.assertEquals(0, txn.complete.call_count) # txn not completed + self.store.set_appservice_state.assert_called_once_with( + service, ApplicationServiceState.DOWN # service marked as down + ) + + def test_poll_no_groups(self): + self.as_api.push_bulk = Mock() + self.event_grouper.drain_groups = Mock(return_value={}) + self.txnctrl.start_polling() + self.assertEquals(0, self.as_api.push_bulk.call_count) + + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) + class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def setUp(self): -- cgit 1.5.1 From 2602ddc379f9bede21cafc8c8f7f57dec44cf69d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:16:14 +0000 Subject: Apply clarity and docstrings --- synapse/appservice/scheduler.py | 2 +- synapse/storage/appservice.py | 14 +++++++++++++- tests/appservice/test_scheduler.py | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 50ad3b8e83..514148c947 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -224,5 +224,5 @@ class _Recoverer(object): @defer.inlineCallbacks def _get_oldest_txn(self): - txn = yield self.store.get_oldest_txn(self.service) + txn = yield self.store.get_oldest_unsent_txn(self.service) defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 6fde7dcc66..4447c8a2e1 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -383,7 +383,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service(ApplicationService): The service who the transaction is for. events(list): A list of events to put in the transaction. Returns: - ApplicationServiceTransaction: A new transaction. + AppServiceTransaction: A new transaction. """ pass @@ -399,3 +399,15 @@ class ApplicationServiceTransactionStore(SQLBaseStore): successfully. """ pass + + def get_oldest_unsent_txn(self, service): + """Get the oldest transaction which has not been sent for this + service. + + Args: + service(ApplicationService): The app service to get the oldest txn. + Returns: + A Deferred which resolves to an AppServiceTransaction or + None. + """ + pass diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index a31755da67..f75a6f5d95 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer + _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock -- cgit 1.5.1 From 10766f1e93f884d8a71af43f565183a54786a3ca Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:17:01 +0000 Subject: Update UTs --- tests/appservice/test_scheduler.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'tests/appservice') diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index f75a6f5d95..9532bf66b8 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -155,18 +155,18 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def take_txn(*args, **kwargs): return defer.succeed(txns.pop(0)) - self.store.get_oldest_txn = Mock(side_effect=take_txn) + self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn) self.recoverer.recover() # shouldn't have called anything prior to waiting for exp backoff - self.assertEquals(0, self.store.get_oldest_txn.call_count) + self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=True) # wait for exp backoff self.clock.advance_time(2000) self.assertEquals(1, txn.send.call_count) self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns - self.assertEquals(2, self.store.get_oldest_txn.call_count) + self.assertEquals(2, self.store.get_oldest_unsent_txn.call_count) self.callback.assert_called_once_with(self.recoverer) self.assertEquals(self.recoverer.service, self.service) @@ -180,10 +180,10 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): return defer.succeed(txns.pop(0)) else: return defer.succeed(txn) - self.store.get_oldest_txn = Mock(side_effect=take_txn) + self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn) self.recoverer.recover() - self.assertEquals(0, self.store.get_oldest_txn.call_count) + self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=False) self.clock.advance_time(2000) self.assertEquals(1, txn.send.call_count) -- cgit 1.5.1 From 21fd84dcb8645a555cc35adb8b2a5a68536b8087 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:01:19 +0000 Subject: Use seconds; start gluing in the AS scheduler into the AS handler. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/__init__.py | 8 +++++++- synapse/handlers/appservice.py | 17 ++++++++++++++--- synapse/storage/__init__.py | 7 +++++-- tests/appservice/test_scheduler.py | 10 +++++----- tests/handlers/test_appservice.py | 7 +++++-- 6 files changed, 38 insertions(+), 15 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index ee5978da6e..068d4bd087 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -132,7 +132,7 @@ class _TransactionController(object): txn.complete(self.store) else: self._start_recoverer(service) - self.clock.call_later(1000, self.start_polling) + self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): @@ -202,7 +202,7 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry) + self.clock.call_later((2 ** self.backoff_counter), self.retry) @defer.inlineCallbacks def retry(self): diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 8d345bf936..0c51d615ec 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( @@ -54,7 +55,12 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( - hs, ApplicationServiceApi(hs) + hs, asapi, AppServiceScheduler( + clock=hs.get_clock(), + store=hs.get_datastore(), + as_api=asapi + ) ) self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2c488a46f6..f3cd458e6b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -26,15 +26,22 @@ import logging logger = logging.getLogger(__name__) +def log_failure(failure): + logger.error("Application Services Failure: %s", failure.value) + logger.error(failure.getTraceback()) + + # NB: Purposefully not inheriting BaseHandler since that contains way too much # setup code which this handler does not need or use. This makes testing a lot # easier. class ApplicationServicesHandler(object): - def __init__(self, hs, appservice_api): + def __init__(self, hs, appservice_api, appservice_scheduler): self.store = hs.get_datastore() self.hs = hs self.appservice_api = appservice_api + self.scheduler = appservice_scheduler + self.started_scheduler = False @defer.inlineCallbacks def register(self, app_service): @@ -90,9 +97,13 @@ class ApplicationServicesHandler(object): if event.type == EventTypes.Member: yield self._check_user_exists(event.state_key) - # Fork off pushes to these services - XXX First cut, best effort + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services for service in services: - self.appservice_api.push(service, event) + self.scheduler.submit_event_for_as(service, event) @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index dfce5224a9..6c159b52a0 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.api.constants import EventTypes -from .appservice import ApplicationServiceStore +from .appservice import ( + ApplicationServiceStore, ApplicationServiceTransactionStore +) from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -79,7 +81,8 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + ApplicationServiceTransactionStore ): def __init__(self, hs): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 9532bf66b8..e18e879319 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -162,7 +162,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=True) # wait for exp backoff - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(1, txn.complete.call_count) # 2 because it needs to get None to know there are no more txns @@ -185,21 +185,21 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.recoverer.recover() self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) txn.send = Mock(return_value=False) - self.clock.advance_time(2000) + self.clock.advance_time(2) self.assertEquals(1, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(4000) + self.clock.advance_time(4) self.assertEquals(2, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) - self.clock.advance_time(8000) + self.clock.advance_time(8) self.assertEquals(3, txn.send.call_count) self.assertEquals(0, txn.complete.call_count) self.assertEquals(0, self.callback.call_count) txn.send = Mock(return_value=True) # successfully send the txn pop_txn = True # returns the txn the first time, then no more. - self.clock.advance_time(16000) + self.clock.advance_time(16) self.assertEquals(1, txn.send.call_count) # new mock reset call count self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a2c541317c..06cb1dd4cf 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -27,10 +27,11 @@ class AppServiceHandlerTestCase(unittest.TestCase): def setUp(self): self.mock_store = Mock() self.mock_as_api = Mock() + self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) self.handler = ApplicationServicesHandler( - hs, self.mock_as_api + hs, self.mock_as_api, self.mock_scheduler ) @defer.inlineCallbacks @@ -52,7 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.mock_as_api.push = Mock() yield self.handler.notify_interested_services(event) - self.mock_as_api.push.assert_called_once_with(interested_service, event) + self.mock_scheduler.submit_event_for_as.assert_called_once_with( + interested_service, event + ) @defer.inlineCallbacks def test_query_room_alias_exists(self): -- cgit 1.5.1 From 835e01fc7047e34a813936544027596627a112df Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 10:16:59 +0000 Subject: Minor PR comment tweaks. --- synapse/appservice/scheduler.py | 4 ++-- synapse/handlers/appservice.py | 10 ++++++++-- synapse/storage/__init__.py | 2 +- synapse/storage/appservice.py | 6 +++--- tests/appservice/test_scheduler.py | 10 +++++----- 5 files changed, 19 insertions(+), 13 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index add1e3879c..8a3a6a880f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -86,7 +86,7 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.on_receive(service, event) + self.event_grouper.enqueue(service, event) class _EventGrouper(object): @@ -96,7 +96,7 @@ class _EventGrouper(object): def __init__(self): self.groups = {} # dict of {service: [events]} - def on_receive(self, service, event): + def enqueue(self, service, event): if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index f3cd458e6b..a24f7f5587 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -27,8 +27,14 @@ logger = logging.getLogger(__name__) def log_failure(failure): - logger.error("Application Services Failure: %s", failure.value) - logger.error(failure.getTraceback()) + logger.error( + "Application Services Failure", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) # NB: Purposefully not inheriting BaseHandler since that contains way too much diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index efef859214..e752b035e6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -82,7 +82,7 @@ class DataStore(RoomMemberStore, RoomStore, FilteringStore, PusherStore, PushRuleStore, - ApplicationServiceTransactionStore + ApplicationServiceTransactionStore, ): def __init__(self, hs): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 670e1d56af..e928812bc9 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -365,9 +365,9 @@ class ApplicationServiceTransactionStore(SQLBaseStore): may be empty. """ sql = ( - "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN " - "application_services AS a ON a.id=s.as_id LEFT JOIN " - "application_services_regex AS r ON r.as_id=a.id WHERE state = ?" + "SELECT r.*, a.* FROM application_services_state AS s LEFT JOIN" + " application_services AS a ON a.id=s.as_id LEFT JOIN" + " application_services_regex AS r ON r.as_id=a.id WHERE state = ?" ) results = yield self._execute_and_decode( "get_appservices_by_state", sql, state diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index e18e879319..4534d05b93 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -213,7 +213,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def test_drain_single_event(self): service = Mock() event = Mock() - self.grouper.on_receive(service, event) + self.grouper.enqueue(service, event) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals([event], groups[service]) @@ -225,7 +225,7 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): service = Mock() events = [Mock(), Mock(), Mock()] for e in events: - self.grouper.on_receive(service, e) + self.grouper.enqueue(service, e) groups = self.grouper.drain_groups() self.assertTrue(service in groups) self.assertEquals(events, groups[service]) @@ -243,11 +243,11 @@ class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): services[2]: events_c } for e in events_b: - self.grouper.on_receive(services[1], e) + self.grouper.enqueue(services[1], e) for e in events_c: - self.grouper.on_receive(services[2], e) + self.grouper.enqueue(services[2], e) for e in events_a: - self.grouper.on_receive(services[0], e) + self.grouper.enqueue(services[0], e) groups = self.grouper.drain_groups() for service in services: -- cgit 1.5.1 From 6279285b2ad59cf003b2e8d73d30dc706e1f3e4a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 13:15:40 +0000 Subject: Replace EventGrouper for ServiceQueuer to move to push-based txns. Fix tests and add stub tests for ServiceQueuer. --- synapse/appservice/scheduler.py | 61 +++++++++++----------- tests/appservice/test_scheduler.py | 100 ++++++++++--------------------------- 2 files changed, 60 insertions(+), 101 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 59a870e271..54c42d1b94 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,11 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - _________ ----ASa[e]-->| Event | -----ASb[e]->| Grouper |<-poll 1/s--+ ---ASa[e]--->|_________| | ASa[e,e] ASb[e] - V + __________ +1---ASa[e]-->| Service |--> Queue ASa[f] +2----ASb[e]->| Queuer | +3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -66,14 +66,14 @@ class AppServiceScheduler(object): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = _EventGrouper() def create_recoverer(service, callback): return _Recoverer(clock, store, as_api, service, callback) self.txn_ctrl = _TransactionController( - clock, store, as_api, self.event_grouper, create_recoverer + clock, store, as_api, create_recoverer ) + self.queuer = _ServiceQueuer(self.txn_ctrl) @defer.inlineCallbacks def start(self): @@ -86,17 +86,26 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.enqueue(service, event) + self.queuer.enqueue(service, event) -class _EventGrouper(object): - """Groups events for the same application service together. +class _ServiceQueuer(object): + """Queues events for the same application service together, sending + transactions as soon as possible. Once a transaction is sent successfully, + this schedules any other events in the queue to run. """ - def __init__(self): + def __init__(self, txn_ctrl): self.groups = {} # dict of {service: [events]} + self.txn_ctrl = txn_ctrl def enqueue(self, service, event): + # if nothing in queue for this service, send event immediately and add + # callbacks. + self.txn_ctrl.send(service, [event]) + + # else add to queue for this service + if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) @@ -109,34 +118,30 @@ class _EventGrouper(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + def __init__(self, clock, store, as_api, recoverer_fn): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn # keep track of how many recoverers there are self.recoverers = [] @defer.inlineCallbacks - def start_polling(self): + def send(self, service, events): try: - groups = self.event_grouper.drain_groups() - for service in groups: - txn = yield self.store.create_appservice_txn( - service=service, - events=groups[service] - ) - service_is_up = yield self._is_service_up(service) - if service_is_up: - sent = yield txn.send(self.as_api) - if sent: - txn.complete(self.store) - else: - self._start_recoverer(service) + txn = yield self.store.create_appservice_txn( + service=service, + events=events + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) except Exception as e: logger.exception(e) - self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 4534d05b93..38d792eb02 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - _EventGrouper, _TransactionController, _Recoverer + _ServiceQueuer, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.clock = MockClock() self.store = Mock() self.as_api = Mock() - self.event_grouper = Mock() self.recoverer = Mock() self.recoverer_fn = Mock(return_value=self.recoverer) self.txnctrl = _TransactionController( clock=self.clock, store=self.store, as_api=self.as_api, - event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + recoverer_fn=self.recoverer_fn ) - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_sent(self): # Test: The AS is up and the txn is successfully sent. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed - def test_poll_single_group_service_down(self): + def test_single_service_down(self): # Test: The AS is down so it shouldn't push; Recoverers will do it. # It should still make a transaction though. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events - self.event_grouper.drain_groups = Mock(return_value=groups) txn = Mock(id="idhere", service=service, events=events) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.DOWN) @@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_not_sent(self): # Test: The AS is up and the txn is not sent. A Recoverer is made and # started. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events @@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): service, ApplicationServiceState.DOWN # service marked as down ) - def test_poll_no_groups(self): - self.as_api.push_bulk = Mock() - self.event_grouper.drain_groups = Mock(return_value={}) - self.txnctrl.start_polling() - self.assertEquals(0, self.as_api.push_bulk.call_count) - class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): @@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.callback.assert_called_once_with(self.recoverer) -class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): +class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def setUp(self): - self.grouper = _EventGrouper() - - def test_drain_single_event(self): - service = Mock() - event = Mock() - self.grouper.enqueue(service, event) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals([event], groups[service]) - self.assertEquals(1, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_events(self): - service = Mock() - events = [Mock(), Mock(), Mock()] - for e in events: - self.grouper.enqueue(service, e) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals(events, groups[service]) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_services(self): - services = [Mock(), Mock(), Mock()] - events_a = [Mock(), Mock()] - events_b = [Mock()] - events_c = [Mock(), Mock(), Mock(), Mock()] - mappings = { - services[0]: events_a, - services[1]: events_b, - services[2]: events_c - } - for e in events_b: - self.grouper.enqueue(services[1], e) - for e in events_c: - self.grouper.enqueue(services[2], e) - for e in events_a: - self.grouper.enqueue(services[0], e) - - groups = self.grouper.drain_groups() - for service in services: - self.assertTrue(service in groups) - self.assertEquals(mappings[service], groups[service]) - self.assertEquals(3, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) + self.txn_ctrl = Mock() + self.queuer = _ServiceQueuer(self.txn_ctrl) + + def test_send_single_event_no_queue(self): + # Expect the event to be sent immediately. + pass + + def test_send_single_event_with_queue(self): + # - Send an event and don't resolve it just yet. + # - Send another event: expect send() to NOT be called. + # - Resolve the send event + # - Expect queued event to be sent + pass + + def test_multiple_service_queues(self): + # Tests that each service has its own queue, and that they don't block + # on each other. + pass -- cgit 1.5.1 From d04fa1f7121d996e05bd4def14951d89eb47d1ab Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 16 Mar 2015 14:03:16 +0000 Subject: Implement ServiceQueuer with tests. --- synapse/appservice/scheduler.py | 46 +++++++++++++++++++++----------- tests/appservice/test_scheduler.py | 54 +++++++++++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 23 deletions(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 54c42d1b94..3cedd479a2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -83,7 +83,6 @@ class AppServiceScheduler(object): self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) self.txn_ctrl.add_recoverers(recoverers) - self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) @@ -96,24 +95,37 @@ class _ServiceQueuer(object): """ def __init__(self, txn_ctrl): - self.groups = {} # dict of {service: [events]} + self.queued_events = {} # dict of {service_id: [events]} + self.pending_requests = {} # dict of {service_id: Deferred} self.txn_ctrl = txn_ctrl def enqueue(self, service, event): - # if nothing in queue for this service, send event immediately and add - # callbacks. - self.txn_ctrl.send(service, [event]) - - # else add to queue for this service - - if service not in self.groups: - self.groups[service] = [] - self.groups[service].append(event) - - def drain_groups(self): - groups = self.groups - self.groups = {} - return groups + # if this service isn't being sent something + if not self.pending_requests.get(service.id): + self._send_request(service, [event]) + else: + # add to queue for this service + if service.id not in self.queued_events: + self.queued_events[service.id] = [] + self.queued_events[service.id].append(event) + + def _send_request(self, service, events): + # send request and add callbacks + d = self.txn_ctrl.send(service, events) + d.addCallback(self._on_request_finish) + d.addErrback(self._on_request_fail) + self.pending_requests[service.id] = d + + def _on_request_finish(self, service): + self.pending_requests[service.id] = None + # if there are queued events, then send them. + if (service.id in self.queued_events + and len(self.queued_events[service.id]) > 0): + self._send_request(service, self.queued_events[service.id]) + self.queued_events[service.id] = [] + + def _on_request_fail(self, err): + logger.error("AS request failed: %s", err) class _TransactionController(object): @@ -142,6 +154,8 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + # request has finished + defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 38d792eb02..82a5965097 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -197,16 +197,56 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def test_send_single_event_no_queue(self): # Expect the event to be sent immediately. - pass + service = Mock(id=4) + event = Mock() + self.queuer.enqueue(service, event) + self.txn_ctrl.send.assert_called_once_with(service, [event]) def test_send_single_event_with_queue(self): - # - Send an event and don't resolve it just yet. - # - Send another event: expect send() to NOT be called. - # - Resolve the send event - # - Expect queued event to be sent - pass + d = defer.Deferred() + self.txn_ctrl.send = Mock(return_value=d) + service = Mock(id=4) + event = Mock(event_id="first") + event2 = Mock(event_id="second") + event3 = Mock(event_id="third") + # Send an event and don't resolve it just yet. + self.queuer.enqueue(service, event) + # Send more events: expect send() to NOT be called multiple times. + self.queuer.enqueue(service, event2) + self.queuer.enqueue(service, event3) + self.txn_ctrl.send.assert_called_with(service, [event]) + self.assertEquals(1, self.txn_ctrl.send.call_count) + # Resolve the send event: expect the queued events to be sent + d.callback(service) + self.txn_ctrl.send.assert_called_with(service, [event2, event3]) + self.assertEquals(2, self.txn_ctrl.send.call_count) def test_multiple_service_queues(self): # Tests that each service has its own queue, and that they don't block # on each other. - pass + srv1 = Mock(id=4) + srv_1_defer = defer.Deferred() + srv_1_event = Mock(event_id="srv1a") + srv_1_event2 = Mock(event_id="srv1b") + + srv2 = Mock(id=6) + srv_2_defer = defer.Deferred() + srv_2_event = Mock(event_id="srv2a") + srv_2_event2 = Mock(event_id="srv2b") + + send_return_list = [srv_1_defer, srv_2_defer] + self.txn_ctrl.send = Mock(side_effect=lambda x,y: send_return_list.pop(0)) + + # send events for different ASes and make sure they are sent + self.queuer.enqueue(srv1, srv_1_event) + self.queuer.enqueue(srv1, srv_1_event2) + self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event]) + self.queuer.enqueue(srv2, srv_2_event) + self.queuer.enqueue(srv2, srv_2_event2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event]) + + # make sure callbacks for a service only send queued events for THAT + # service + srv_2_defer.callback(srv2) + self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2]) + self.assertEquals(3, self.txn_ctrl.send.call_count) -- cgit 1.5.1 From 09cbff174a01757d10107b7960972a484153323e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 31 Mar 2015 16:43:49 +0100 Subject: Fix thinko whereby events *for the AS specifically* were not passed on. This was caused by not explicitly checking the service.sender field. This has now been fixed and a regression test has been added. --- synapse/appservice/__init__.py | 5 ++++- tests/appservice/test_appservice.py | 13 +++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) (limited to 'tests/appservice') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index c60db16b74..4a6cdbc2be 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -199,7 +199,10 @@ class ApplicationService(object): return self._matches_user(event, member_list) def is_interested_in_user(self, user_id): - return self._matches_regex(user_id, ApplicationService.NS_USERS) + return ( + self._matches_regex(user_id, ApplicationService.NS_USERS) + or user_id == self.sender + ) def is_interested_in_alias(self, alias): return self._matches_regex(alias, ApplicationService.NS_ALIASES) diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index eb7becf725..62149d6902 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -199,6 +199,19 @@ class ApplicationServiceTestCase(unittest.TestCase): aliases_for_event=["#xmpp_barfoo:matrix.org"] )) + def test_interested_in_self(self): + # make sure invites get through + self.service.sender = "@appservice:name" + self.service.namespaces[ApplicationService.NS_USERS].append( + _regex("@irc_.*") + ) + self.event.type = "m.room.member" + self.event.content = { + "membership": "invite" + } + self.event.state_key = self.service.sender + self.assertTrue(self.service.is_interested(self.event)) + def test_member_list_match(self): self.service.namespaces[ApplicationService.NS_USERS].append( _regex("@irc_.*") -- cgit 1.5.1