diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index cc6c381566..743a8278ad 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -25,6 +25,45 @@ class ApplicationServiceState(object):
UP = "up"
+class AppServiceTransaction(object):
+ """Represents an application service transaction."""
+
+ def __init__(self, service, id, events):
+ self.service = service
+ self.id = id
+ self.events = events
+
+ def send(self, as_api):
+ """Sends this transaction using the provided AS API interface.
+
+ Args:
+ as_api(ApplicationServiceApi): The API to use to send.
+ Returns:
+ A Deferred which resolves to True if the transaction was sent.
+ """
+ return as_api.push_bulk(
+ service=self.service,
+ events=self.events,
+ txn_id=self.id
+ )
+
+ def complete(self, store):
+ """Completes this transaction as successful.
+
+ Marks this transaction ID on the application service and removes the
+ transaction contents from the database.
+
+ Args:
+ store: The database store to operate on.
+ Returns:
+ A Deferred which resolves to True if the transaction was completed.
+ """
+ return store.complete_appservice_txn(
+ service=self.service,
+ txn_id=self.id
+ )
+
+
class ApplicationService(object):
"""Defines an application service. This definition is mostly what is
provided to the /register AS API.
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 2b3aa3b0ea..50ad3b8e83 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -88,45 +88,6 @@ class AppServiceScheduler(object):
self.event_grouper.on_receive(service, event)
-class AppServiceTransaction(object):
- """Represents an application service transaction."""
-
- def __init__(self, service, id, events):
- self.service = service
- self.id = id
- self.events = events
-
- def send(self, as_api):
- """Sends this transaction using the provided AS API interface.
-
- Args:
- as_api(ApplicationServiceApi): The API to use to send.
- Returns:
- A Deferred which resolves to True if the transaction was sent.
- """
- return as_api.push_bulk(
- service=self.service,
- events=self.events,
- txn_id=self.id
- )
-
- def complete(self, store):
- """Completes this transaction as successful.
-
- Marks this transaction ID on the application service and removes the
- transaction contents from the database.
-
- Args:
- store: The database store to operate on.
- Returns:
- A Deferred which resolves to True if the transaction was completed.
- """
- return store.complete_appservice_txn(
- service=self.service,
- txn_id=self.id
- )
-
-
class _EventGrouper(object):
"""Groups events for the same application service together.
"""
@@ -156,14 +117,18 @@ class _TransactionController(object):
# keep track of how many recoverers there are
self.recoverers = []
+ @defer.inlineCallbacks
def start_polling(self):
groups = self.event_grouper.drain_groups()
for service in groups:
- txn_id = self._get_next_txn_id(service)
- txn = AppServiceTransaction(service, txn_id, groups[service])
- self._store_txn(txn)
- if self._is_service_up(service):
- if txn.send(self.as_api):
+ txn = yield self.store.create_appservice_txn(
+ service=service,
+ events=groups[service]
+ )
+ service_is_up = yield self._is_service_up(service)
+ if service_is_up:
+ sent = yield txn.send(self.as_api)
+ if sent:
txn.complete(self.store)
else:
self._start_recoverer(service)
@@ -207,14 +172,10 @@ class _TransactionController(object):
logger.error("Failed to apply appservice state DOWN to service %s",
service)
+ @defer.inlineCallbacks
def _is_service_up(self, service):
- pass
-
- def _get_next_txn_id(self, service):
- pass # TODO work out the next txn_id for this service
-
- def _store_txn(self, txn):
- pass
+ state = yield self.store.get_appservice_state(service)
+ defer.returnValue(state == ApplicationServiceState.UP)
class _Recoverer(object):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 214f6d99c5..6fde7dcc66 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -354,6 +354,16 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
"""
pass
+ def get_appservice_state(self, service):
+ """Get the application service state.
+
+ Args:
+ service(ApplicationService): The service whose state to set.
+ Returns:
+ A Deferred which resolves to ApplicationServiceState.
+ """
+ pass
+
def set_appservice_state(self, service, state):
"""Set the application service state.
@@ -365,6 +375,18 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
"""
pass
+ def create_appservice_txn(self, service, events):
+ """Atomically creates a new transaction for this application service
+ with the given list of events.
+
+ Args:
+ service(ApplicationService): The service who the transaction is for.
+ events(list<Event>): A list of events to put in the transaction.
+ Returns:
+ ApplicationServiceTransaction: A new transaction.
+ """
+ pass
+
def complete_appservice_txn(self, txn_id, service):
"""Completes an application service transaction.
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index ec8f77c54b..a31755da67 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -12,9 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.appservice import ApplicationServiceState, AppServiceTransaction
from synapse.appservice.scheduler import (
- AppServiceScheduler, AppServiceTransaction, _EventGrouper,
- _TransactionController, _Recoverer
+ AppServiceScheduler, _EventGrouper, _TransactionController, _Recoverer
)
from twisted.internet import defer
from ..utils import MockClock
@@ -22,6 +22,116 @@ from mock import Mock
from tests import unittest
+class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.clock = MockClock()
+ self.store = Mock()
+ self.as_api = Mock()
+ self.event_grouper = Mock()
+ self.recoverer = Mock()
+ self.recoverer_fn = Mock(return_value=self.recoverer)
+ self.txnctrl = _TransactionController(
+ clock=self.clock, store=self.store, as_api=self.as_api,
+ event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn
+ )
+
+ def test_poll_single_group_service_up(self):
+ # Test: The AS is up and the txn is successfully sent.
+ service = Mock()
+ events = [Mock(), Mock()]
+ groups = {}
+ groups[service] = events
+ txn_id = "foobar"
+ txn = Mock(id=txn_id, service=service, events=events)
+
+ # mock methods
+ self.event_grouper.drain_groups = Mock(return_value=groups)
+ self.store.get_appservice_state = Mock(
+ return_value=defer.succeed(ApplicationServiceState.UP)
+ )
+ txn.send = Mock(return_value=defer.succeed(True))
+ self.store.create_appservice_txn = Mock(
+ return_value=defer.succeed(txn)
+ )
+
+ # actual call
+ self.txnctrl.start_polling()
+
+ self.store.create_appservice_txn.assert_called_once_with(
+ service=service, events=events # txn made and saved
+ )
+ self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
+ txn.complete.assert_called_once_with(self.store) # txn completed
+
+ def test_poll_single_group_service_down(self):
+ # Test: The AS is down so it shouldn't push; Recoverers will do it.
+ # It should still make a transaction though.
+ service = Mock()
+ events = [Mock(), Mock()]
+ groups = {}
+ groups[service] = events
+
+ self.event_grouper.drain_groups = Mock(return_value=groups)
+ txn = Mock(id="idhere", service=service, events=events)
+ self.store.get_appservice_state = Mock(
+ return_value=defer.succeed(ApplicationServiceState.DOWN)
+ )
+ self.store.create_appservice_txn = Mock(
+ return_value=defer.succeed(txn)
+ )
+
+ # actual call
+ self.txnctrl.start_polling()
+
+ self.store.create_appservice_txn.assert_called_once_with(
+ service=service, events=events # txn made and saved
+ )
+ self.assertEquals(0, txn.send.call_count) # txn not sent though
+ self.assertEquals(0, txn.complete.call_count) # or completed
+
+ def test_poll_single_group_service_up(self):
+ # Test: The AS is up and the txn is not sent. A Recoverer is made and
+ # started.
+ service = Mock()
+ events = [Mock(), Mock()]
+ groups = {}
+ groups[service] = events
+ txn_id = "foobar"
+ txn = Mock(id=txn_id, service=service, events=events)
+
+ # mock methods
+ self.event_grouper.drain_groups = Mock(return_value=groups)
+ self.store.get_appservice_state = Mock(
+ return_value=defer.succeed(ApplicationServiceState.UP)
+ )
+ self.store.set_appservice_state = Mock(return_value=defer.succeed(True))
+ txn.send = Mock(return_value=defer.succeed(False)) # fails to send
+ self.store.create_appservice_txn = Mock(
+ return_value=defer.succeed(txn)
+ )
+
+ # actual call
+ self.txnctrl.start_polling()
+
+ self.store.create_appservice_txn.assert_called_once_with(
+ service=service, events=events
+ )
+ self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made
+ self.assertEquals(1, self.recoverer.recover.call_count) # and invoked
+ self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored
+ self.assertEquals(0, txn.complete.call_count) # txn not completed
+ self.store.set_appservice_state.assert_called_once_with(
+ service, ApplicationServiceState.DOWN # service marked as down
+ )
+
+ def test_poll_no_groups(self):
+ self.as_api.push_bulk = Mock()
+ self.event_grouper.drain_groups = Mock(return_value={})
+ self.txnctrl.start_polling()
+ self.assertEquals(0, self.as_api.push_bulk.call_count)
+
+
class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
def setUp(self):
@@ -94,6 +204,7 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.assertEquals(1, txn.complete.call_count)
self.callback.assert_called_once_with(self.recoverer)
+
class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):
def setUp(self):
|