From f260cb72cd3435d540411962a92ca2a9fd333eb1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 15:12:24 +0000 Subject: Flesh out more stub functions. --- synapse/appservice/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/appservice/__init__.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index a268a6bcc4..cc6c381566 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -20,6 +20,11 @@ import re logger = logging.getLogger(__name__) +class ApplicationServiceState(object): + DOWN = "down" + UP = "up" + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. -- cgit 1.5.1 From 0354659f9d8b60b9edc78b0b597bceb52b8c7b2b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 16:09:05 +0000 Subject: Finish synapse.appservice.scheduler implementation. With tests to assert behaviour. Not hooked up yet. Stub datastore methods not implemented yet. --- synapse/appservice/__init__.py | 39 +++++++++++++ synapse/appservice/scheduler.py | 63 ++++---------------- synapse/storage/appservice.py | 22 +++++++ tests/appservice/test_scheduler.py | 115 ++++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 53 deletions(-) (limited to 'synapse/appservice/__init__.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index cc6c381566..743a8278ad 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -25,6 +25,45 @@ class ApplicationServiceState(object): UP = "up" +class AppServiceTransaction(object): + """Represents an application service transaction.""" + + def __init__(self, service, id, events): + self.service = service + self.id = id + self.events = events + + def send(self, as_api): + """Sends this transaction using the provided AS API interface. + + Args: + as_api(ApplicationServiceApi): The API to use to send. + Returns: + A Deferred which resolves to True if the transaction was sent. + """ + return as_api.push_bulk( + service=self.service, + events=self.events, + txn_id=self.id + ) + + def complete(self, store): + """Completes this transaction as successful. + + Marks this transaction ID on the application service and removes the + transaction contents from the database. + + Args: + store: The database store to operate on. + Returns: + A Deferred which resolves to True if the transaction was completed. + """ + return store.complete_appservice_txn( + service=self.service, + txn_id=self.id + ) + + class ApplicationService(object): """Defines an application service. This definition is mostly what is provided to the /register AS API. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2b3aa3b0ea..50ad3b8e83 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -88,45 +88,6 @@ class AppServiceScheduler(object): self.event_grouper.on_receive(service, event) -class AppServiceTransaction(object): - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - def send(self, as_api): - """Sends this transaction using the provided AS API interface. - - Args: - as_api(ApplicationServiceApi): The API to use to send. - Returns: - A Deferred which resolves to True if the transaction was sent. - """ - return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id - ) - - def complete(self, store): - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - Returns: - A Deferred which resolves to True if the transaction was completed. - """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) - - class _EventGrouper(object): """Groups events for the same application service together. """ @@ -156,14 +117,18 @@ class _TransactionController(object): # keep track of how many recoverers there are self.recoverers = [] + @defer.inlineCallbacks def start_polling(self): groups = self.event_grouper.drain_groups() for service in groups: - txn_id = self._get_next_txn_id(service) - txn = AppServiceTransaction(service, txn_id, groups[service]) - self._store_txn(txn) - if self._is_service_up(service): - if txn.send(self.as_api): + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: txn.complete(self.store) else: self._start_recoverer(service) @@ -207,14 +172,10 @@ class _TransactionController(object): logger.error("Failed to apply appservice state DOWN to service %s", service) + @defer.inlineCallbacks def _is_service_up(self, service): - pass - - def _get_next_txn_id(self, service): - pass # TODO work out the next txn_id for this service - - def _store_txn(self, txn): - pass + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP) class _Recoverer(object): diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 214f6d99c5..6fde7dcc66 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def get_appservice_state(self, service): + """Get the application service state. + + Args: + service(ApplicationService): The service whose state to set. + Returns: + A Deferred which resolves to ApplicationServiceState. + """ + pass + def set_appservice_state(self, service, state): """Set the application service state. @@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore): """ pass + def create_appservice_txn(self, service, events): + """Atomically creates a new transaction for this application service + with the given list of events. + + Args: + service(ApplicationService): The service who the transaction is for. + events(list): A list of events to put in the transaction. + Returns: + ApplicationServiceTransaction: A new transaction. + """ + pass + def complete_appservice_txn(self, txn_id, service): """Completes an application service transaction. diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index ec8f77c54b..a31755da67 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -12,9 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - AppServiceScheduler, AppServiceTransaction, _EventGrouper, - _TransactionController, _Recoverer + AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -22,6 +22,116 @@ from mock import Mock from tests import unittest +class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): + + def setUp(self): + self.clock = MockClock() + self.store = Mock() + self.as_api = Mock() + self.event_grouper = Mock() + self.recoverer = Mock() + self.recoverer_fn = Mock(return_value=self.recoverer) + self.txnctrl = _TransactionController( + clock=self.clock, store=self.store, as_api=self.as_api, + event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + ) + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is successfully sent. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + txn.send = Mock(return_value=defer.succeed(True)) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made + txn.complete.assert_called_once_with(self.store) # txn completed + + def test_poll_single_group_service_down(self): + # Test: The AS is down so it shouldn't push; Recoverers will do it. + # It should still make a transaction though. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + + self.event_grouper.drain_groups = Mock(return_value=groups) + txn = Mock(id="idhere", service=service, events=events) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.DOWN) + ) + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events # txn made and saved + ) + self.assertEquals(0, txn.send.call_count) # txn not sent though + self.assertEquals(0, txn.complete.call_count) # or completed + + def test_poll_single_group_service_up(self): + # Test: The AS is up and the txn is not sent. A Recoverer is made and + # started. + service = Mock() + events = [Mock(), Mock()] + groups = {} + groups[service] = events + txn_id = "foobar" + txn = Mock(id=txn_id, service=service, events=events) + + # mock methods + self.event_grouper.drain_groups = Mock(return_value=groups) + self.store.get_appservice_state = Mock( + return_value=defer.succeed(ApplicationServiceState.UP) + ) + self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) + txn.send = Mock(return_value=defer.succeed(False)) # fails to send + self.store.create_appservice_txn = Mock( + return_value=defer.succeed(txn) + ) + + # actual call + self.txnctrl.start_polling() + + self.store.create_appservice_txn.assert_called_once_with( + service=service, events=events + ) + self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made + self.assertEquals(1, self.recoverer.recover.call_count) # and invoked + self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored + self.assertEquals(0, txn.complete.call_count) # txn not completed + self.store.set_appservice_state.assert_called_once_with( + service, ApplicationServiceState.DOWN # service marked as down + ) + + def test_poll_no_groups(self): + self.as_api.push_bulk = Mock() + self.event_grouper.drain_groups = Mock(return_value={}) + self.txnctrl.start_polling() + self.assertEquals(0, self.as_api.push_bulk.call_count) + + class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): def setUp(self): @@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEquals(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) + class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): def setUp(self): -- cgit 1.5.1 From 4a6afa6abf6c90c393bc3fa00e40d3927fc0c6c1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 6 Mar 2015 17:27:55 +0000 Subject: Assign the AS ID from the database; replace old placeholder txn id. --- synapse/appservice/__init__.py | 4 ++-- synapse/storage/appservice.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'synapse/appservice/__init__.py') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 743a8278ad..c60db16b74 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -79,13 +79,13 @@ class ApplicationService(object): NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS] def __init__(self, token, url=None, namespaces=None, hs_token=None, - sender=None, txn_id=None): + sender=None, id=None): self.token = token self.url = url self.hs_token = hs_token self.sender = sender self.namespaces = self._check_namespaces(namespaces) - self.txn_id = txn_id + self.id = id def _check_namespaces(self, namespaces): # Sanity check that it is of the form: diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index eec8fbd592..582269b8d5 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -302,6 +302,7 @@ class ApplicationServiceStore(SQLBaseStore): if as_token not in services: # add the service services[as_token] = { + "id": res["as_id"], "url": res["url"], "token": as_token, "hs_token": res["hs_token"], @@ -326,7 +327,6 @@ class ApplicationServiceStore(SQLBaseStore): except JSONDecodeError: logger.error("Bad regex object '%s'", res["regex"]) - # TODO get last successful txn id f.e. service for service in services.values(): logger.info("Found application service: %s", service) self.services_cache.append(ApplicationService( @@ -334,7 +334,8 @@ class ApplicationServiceStore(SQLBaseStore): url=service["url"], namespaces=service["namespaces"], hs_token=service["hs_token"], - sender=service["sender"] + sender=service["sender"], + id=service["id"] )) -- cgit 1.5.1