summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/__init__.py48
-rw-r--r--synapse/appservice/api.py13
-rw-r--r--synapse/appservice/scheduler.py230
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/appservice.py86
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql31
-rw-r--r--tests/appservice/test_scheduler.py258
7 files changed, 659 insertions, 9 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index a268a6bcc4..c60db16b74 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -20,6 +20,50 @@ import re
 logger = logging.getLogger(__name__)
 
 
+class ApplicationServiceState(object):
+    DOWN = "down"
+    UP = "up"
+
+
+class AppServiceTransaction(object):
+    """Represents an application service transaction."""
+
+    def __init__(self, service, id, events):
+        self.service = service
+        self.id = id
+        self.events = events
+
+    def send(self, as_api):
+        """Sends this transaction using the provided AS API interface.
+
+        Args:
+            as_api(ApplicationServiceApi): The API to use to send.
+        Returns:
+            A Deferred which resolves to True if the transaction was sent.
+        """
+        return as_api.push_bulk(
+            service=self.service,
+            events=self.events,
+            txn_id=self.id
+        )
+
+    def complete(self, store):
+        """Completes this transaction as successful.
+
+        Marks this transaction ID on the application service and removes the
+        transaction contents from the database.
+
+        Args:
+            store: The database store to operate on.
+        Returns:
+            A Deferred which resolves to True if the transaction was completed.
+        """
+        return store.complete_appservice_txn(
+            service=self.service,
+            txn_id=self.id
+        )
+
+
 class ApplicationService(object):
     """Defines an application service. This definition is mostly what is
     provided to the /register AS API.
@@ -35,13 +79,13 @@ class ApplicationService(object):
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
     def __init__(self, token, url=None, namespaces=None, hs_token=None,
-                 sender=None, txn_id=None):
+                 sender=None, id=None):
         self.token = token
         self.url = url
         self.hs_token = hs_token
         self.sender = sender
         self.namespaces = self._check_namespaces(namespaces)
-        self.txn_id = txn_id
+        self.id = id
 
     def _check_namespaces(self, namespaces):
         # Sanity check that it is of the form:
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index c2179f8d55..c17fb219c5 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -72,11 +72,16 @@ class ApplicationServiceApi(SimpleHttpClient):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def push_bulk(self, service, events):
+    def push_bulk(self, service, events, txn_id=None):
         events = self._serialize(events)
 
+        if txn_id is None:
+            logger.warning("push_bulk: Missing txn ID sending events to %s",
+                           service.url)
+            txn_id = str(0)
+
         uri = service.url + ("/transactions/%s" %
-                             urllib.quote(str(0)))  # TODO txn_ids
+                             urllib.quote(txn_id))
         response = None
         try:
             response = yield self.put_json(
@@ -97,8 +102,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def push(self, service, event):
-        response = yield self.push_bulk(service, [event])
+    def push(self, service, event, txn_id=None):
+        response = yield self.push_bulk(service, [event], txn_id)
         defer.returnValue(response)
 
     def _serialize(self, events):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
new file mode 100644
index 0000000000..ee5978da6e
--- /dev/null
+++ b/synapse/appservice/scheduler.py
@@ -0,0 +1,230 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module controls the reliability for application service transactions.
+
+The nominal flow through this module looks like:
+             _________
+---ASa[e]-->|  Event  |
+----ASb[e]->| Grouper |<-poll 1/s--+
+--ASa[e]--->|_________|            | ASa[e,e]  ASb[e]
+                                   V
+      -````````-            +------------+
+      |````````|<--StoreTxn-|Transaction |
+      |Database|            | Controller |---> SEND TO AS
+      `--------`            +------------+
+What happens on SEND TO AS depends on the state of the Application Service:
+ - If the AS is marked as DOWN, do nothing.
+ - If the AS is marked as UP, send the transaction.
+     * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
+                 contents from the db.
+     * FAILURE : Marked AS as DOWN and start Recoverer.
+
+Recoverer attempts to recover ASes who have died. The flow for this looks like:
+                ,--------------------- backoff++ --------------.
+               V                                               |
+  START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
+             backoff           DB and try to send it
+                                 ^                |___________
+Mark AS as                       |                            V
+UP & quit           +---------- YES                       SUCCESS
+    |               |                                         |
+    NO <--- Have more txns? <------ Mark txn success & nuke <-+
+                                      from db; incr AS pos.
+                                         Reset backoff.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
+"""
+
+from synapse.appservice import ApplicationServiceState
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AppServiceScheduler(object):
+    """ Public facing API for this module. Does the required DI to tie the
+    components together. This also serves as the "event_pool", which in this
+    case is a simple array.
+    """
+
+    def __init__(self, clock, store, as_api):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.event_grouper = _EventGrouper()
+
+        def create_recoverer(service, callback):
+            return _Recoverer(clock, store, as_api, service, callback)
+
+        self.txn_ctrl = _TransactionController(
+            clock, store, as_api, self.event_grouper, create_recoverer
+        )
+
+    @defer.inlineCallbacks
+    def start(self):
+        # check for any DOWN ASes and start recoverers for them.
+        recoverers = yield _Recoverer.start(
+            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        )
+        self.txn_ctrl.add_recoverers(recoverers)
+        self.txn_ctrl.start_polling()
+
+    def submit_event_for_as(self, service, event):
+        self.event_grouper.on_receive(service, event)
+
+
+class _EventGrouper(object):
+    """Groups events for the same application service together.
+    """
+
+    def __init__(self):
+        self.groups = {}  # dict of {service: [events]}
+
+    def on_receive(self, service, event):
+        if service not in self.groups:
+            self.groups[service] = []
+        self.groups[service].append(event)
+
+    def drain_groups(self):
+        groups = self.groups
+        self.groups = {}
+        return groups
+
+
+class _TransactionController(object):
+
+    def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.event_grouper = event_grouper
+        self.recoverer_fn = recoverer_fn
+        # keep track of how many recoverers there are
+        self.recoverers = []
+
+    @defer.inlineCallbacks
+    def start_polling(self):
+        groups = self.event_grouper.drain_groups()
+        for service in groups:
+            txn = yield self.store.create_appservice_txn(
+                service=service,
+                events=groups[service]
+            )
+            service_is_up = yield self._is_service_up(service)
+            if service_is_up:
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    txn.complete(self.store)
+                else:
+                    self._start_recoverer(service)
+        self.clock.call_later(1000, self.start_polling)
+
+    @defer.inlineCallbacks
+    def on_recovered(self, recoverer):
+        self.recoverers.remove(recoverer)
+        logger.info("Successfully recovered application service: %s",
+                    recoverer.service)
+        logger.info("Active recoverers: %s", len(self.recoverers))
+        applied_state = yield self.store.set_appservice_state(
+            recoverer.service,
+            ApplicationServiceState.UP
+        )
+        if not applied_state:
+            logger.error("Failed to apply appservice state UP to service %s",
+                         recoverer.service)
+
+    def add_recoverers(self, recoverers):
+        for r in recoverers:
+            self.recoverers.append(r)
+        if len(recoverers) > 0:
+            logger.info("Active recoverers: %s", len(self.recoverers))
+
+    @defer.inlineCallbacks
+    def _start_recoverer(self, service):
+        applied_state = yield self.store.set_appservice_state(
+            service,
+            ApplicationServiceState.DOWN
+        )
+        if applied_state:
+            logger.info(
+                "Application service falling behind. Starting recoverer. %s",
+                service
+            )
+            recoverer = self.recoverer_fn(service, self.on_recovered)
+            self.add_recoverers([recoverer])
+            recoverer.recover()
+        else:
+            logger.error("Failed to apply appservice state DOWN to service %s",
+                         service)
+
+    @defer.inlineCallbacks
+    def _is_service_up(self, service):
+        state = yield self.store.get_appservice_state(service)
+        defer.returnValue(state == ApplicationServiceState.UP)
+
+
+class _Recoverer(object):
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def start(clock, store, as_api, callback):
+        services = yield store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
+        )
+        recoverers = [
+            _Recoverer(clock, store, as_api, s, callback) for s in services
+        ]
+        for r in recoverers:
+            r.recover()
+        defer.returnValue(recoverers)
+
+    def __init__(self, clock, store, as_api, service, callback):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.service = service
+        self.callback = callback
+        self.backoff_counter = 1
+
+    def recover(self):
+        self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry)
+
+    @defer.inlineCallbacks
+    def retry(self):
+        txn = yield self._get_oldest_txn()
+        if txn:
+            if txn.send(self.as_api):
+                txn.complete(self.store)
+                # reset the backoff counter and retry immediately
+                self.backoff_counter = 1
+                yield self.retry()
+            else:
+                # cap the backoff to be around 18h => (2^16) = 65536 secs
+                if self.backoff_counter < 16:
+                    self.backoff_counter += 1
+                self.recover()
+        else:
+            self._set_service_recovered()
+
+    def _set_service_recovered(self):
+        self.callback(self)
+
+    @defer.inlineCallbacks
+    def _get_oldest_txn(self):
+        txn = yield self.store.get_oldest_unsent_txn(self.service)
+        defer.returnValue(txn)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a3ff995695..dfce5224a9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -57,7 +57,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 14
+SCHEMA_VERSION = 15
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index e30265750a..582269b8d5 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -302,6 +302,7 @@ class ApplicationServiceStore(SQLBaseStore):
             if as_token not in services:
                 # add the service
                 services[as_token] = {
+                    "id": res["as_id"],
                     "url": res["url"],
                     "token": as_token,
                     "hs_token": res["hs_token"],
@@ -326,7 +327,6 @@ class ApplicationServiceStore(SQLBaseStore):
             except JSONDecodeError:
                 logger.error("Bad regex object '%s'", res["regex"])
 
-        # TODO get last successful txn id f.e. service
         for service in services.values():
             logger.info("Found application service: %s", service)
             self.services_cache.append(ApplicationService(
@@ -334,5 +334,87 @@ class ApplicationServiceStore(SQLBaseStore):
                 url=service["url"],
                 namespaces=service["namespaces"],
                 hs_token=service["hs_token"],
-                sender=service["sender"]
+                sender=service["sender"],
+                id=service["id"]
             ))
+
+
+class ApplicationServiceTransactionStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        super(ApplicationServiceTransactionStore, self).__init__(hs)
+
+    def get_appservices_by_state(self, state):
+        """Get a list of application services based on their state.
+
+        Args:
+            state(ApplicationServiceState): The state to filter on.
+        Returns:
+            A Deferred which resolves to a list of ApplicationServices, which
+            may be empty.
+        """
+        pass
+
+    def get_appservice_state(self, service):
+        """Get the application service state.
+
+        Args:
+            service(ApplicationService): The service whose state to set.
+        Returns:
+            A Deferred which resolves to ApplicationServiceState.
+        """
+        pass
+
+    def set_appservice_state(self, service, state):
+        """Set the application service state.
+
+        Args:
+            service(ApplicationService): The service whose state to set.
+            state(ApplicationServiceState): The connectivity state to apply.
+        Returns:
+            A Deferred which resolves to True if the state was set successfully.
+        """
+        pass
+
+    def create_appservice_txn(self, service, events):
+        """Atomically creates a new transaction for this application service
+        with the given list of events.
+
+        Args:
+            service(ApplicationService): The service who the transaction is for.
+            events(list<Event>): A list of events to put in the transaction.
+        Returns:
+            AppServiceTransaction: A new transaction.
+        """
+        # TODO: work out txn id (highest txn id for this service += 1)
+        # TODO: Within same db transaction, Insert new txn into txn table
+        pass
+
+    def complete_appservice_txn(self, txn_id, service):
+        """Completes an application service transaction.
+
+        Args:
+            txn_id(str): The transaction ID being completed.
+            service(ApplicationService): The application service which was sent
+            this transaction.
+        Returns:
+            A Deferred which resolves to True if this transaction was completed
+            successfully.
+        """
+        # TODO: Set current txn_id for AS to 'txn_id'
+        # TODO: Delete txn contents
+        pass
+
+    def get_oldest_unsent_txn(self, service):
+        """Get the oldest transaction which has not been sent for this
+        service.
+
+        Args:
+            service(ApplicationService): The app service to get the oldest txn.
+        Returns:
+            A Deferred which resolves to an AppServiceTransaction or
+            None.
+        """
+        # TODO: Monotonically increasing txn ids, so just select the smallest
+        # one in the txns table (we delete them when they are sent)
+        pass
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
new file mode 100644
index 0000000000..11f0c799aa
--- /dev/null
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -0,0 +1,31 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+    as_id INTEGER PRIMARY KEY,
+    state TEXT NOT NULL,
+    last_txn TEXT,
+    FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+    as_id INTEGER NOT NULL,
+    txn_id INTEGER NOT NULL,
+    content TEXT NOT NULL,
+    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+);
+
+
+
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
new file mode 100644
index 0000000000..9532bf66b8
--- /dev/null
+++ b/tests/appservice/test_scheduler.py
@@ -0,0 +1,258 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.appservice import ApplicationServiceState, AppServiceTransaction
+from synapse.appservice.scheduler import (
+    _EventGrouper, _TransactionController, _Recoverer
+)
+from twisted.internet import defer
+from ..utils import MockClock
+from mock import Mock
+from tests import unittest
+
+
+class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.clock = MockClock()
+        self.store = Mock()
+        self.as_api = Mock()
+        self.event_grouper = Mock()
+        self.recoverer = Mock()
+        self.recoverer_fn = Mock(return_value=self.recoverer)
+        self.txnctrl = _TransactionController(
+            clock=self.clock, store=self.store, as_api=self.as_api,
+            event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn
+        )
+
+    def test_poll_single_group_service_up(self):
+        # Test: The AS is up and the txn is successfully sent.
+        service = Mock()
+        events = [Mock(), Mock()]
+        groups = {}
+        groups[service] = events
+        txn_id = "foobar"
+        txn = Mock(id=txn_id, service=service, events=events)
+
+        # mock methods
+        self.event_grouper.drain_groups = Mock(return_value=groups)
+        self.store.get_appservice_state = Mock(
+            return_value=defer.succeed(ApplicationServiceState.UP)
+        )
+        txn.send = Mock(return_value=defer.succeed(True))
+        self.store.create_appservice_txn = Mock(
+            return_value=defer.succeed(txn)
+        )
+
+        # actual call
+        self.txnctrl.start_polling()
+
+        self.store.create_appservice_txn.assert_called_once_with(
+            service=service, events=events  # txn made and saved
+        )
+        self.assertEquals(0, len(self.txnctrl.recoverers))  # no recoverer made
+        txn.complete.assert_called_once_with(self.store)  # txn completed
+
+    def test_poll_single_group_service_down(self):
+        # Test: The AS is down so it shouldn't push; Recoverers will do it.
+        # It should still make a transaction though.
+        service = Mock()
+        events = [Mock(), Mock()]
+        groups = {}
+        groups[service] = events
+
+        self.event_grouper.drain_groups = Mock(return_value=groups)
+        txn = Mock(id="idhere", service=service, events=events)
+        self.store.get_appservice_state = Mock(
+            return_value=defer.succeed(ApplicationServiceState.DOWN)
+        )
+        self.store.create_appservice_txn = Mock(
+            return_value=defer.succeed(txn)
+        )
+
+        # actual call
+        self.txnctrl.start_polling()
+
+        self.store.create_appservice_txn.assert_called_once_with(
+            service=service, events=events  # txn made and saved
+        )
+        self.assertEquals(0, txn.send.call_count)  # txn not sent though
+        self.assertEquals(0, txn.complete.call_count)  # or completed
+
+    def test_poll_single_group_service_up(self):
+        # Test: The AS is up and the txn is not sent. A Recoverer is made and
+        # started.
+        service = Mock()
+        events = [Mock(), Mock()]
+        groups = {}
+        groups[service] = events
+        txn_id = "foobar"
+        txn = Mock(id=txn_id, service=service, events=events)
+
+        # mock methods
+        self.event_grouper.drain_groups = Mock(return_value=groups)
+        self.store.get_appservice_state = Mock(
+            return_value=defer.succeed(ApplicationServiceState.UP)
+        )
+        self.store.set_appservice_state = Mock(return_value=defer.succeed(True))
+        txn.send = Mock(return_value=defer.succeed(False))  # fails to send
+        self.store.create_appservice_txn = Mock(
+            return_value=defer.succeed(txn)
+        )
+
+        # actual call
+        self.txnctrl.start_polling()
+
+        self.store.create_appservice_txn.assert_called_once_with(
+            service=service, events=events
+        )
+        self.assertEquals(1, self.recoverer_fn.call_count)  # recoverer made
+        self.assertEquals(1, self.recoverer.recover.call_count)  # and invoked
+        self.assertEquals(1, len(self.txnctrl.recoverers))  # and stored
+        self.assertEquals(0, txn.complete.call_count)  # txn not completed
+        self.store.set_appservice_state.assert_called_once_with(
+            service, ApplicationServiceState.DOWN  # service marked as down
+        )
+
+    def test_poll_no_groups(self):
+        self.as_api.push_bulk = Mock()
+        self.event_grouper.drain_groups = Mock(return_value={})
+        self.txnctrl.start_polling()
+        self.assertEquals(0, self.as_api.push_bulk.call_count)
+
+
+class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.clock = MockClock()
+        self.as_api = Mock()
+        self.store = Mock()
+        self.service = Mock()
+        self.callback = Mock()
+        self.recoverer = _Recoverer(
+            clock=self.clock,
+            as_api=self.as_api,
+            store=self.store,
+            service=self.service,
+            callback=self.callback,
+        )
+
+    def test_recover_single_txn(self):
+        txn = Mock()
+        # return one txn to send, then no more old txns
+        txns = [txn, None]
+
+        def take_txn(*args, **kwargs):
+            return defer.succeed(txns.pop(0))
+        self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn)
+
+        self.recoverer.recover()
+        # shouldn't have called anything prior to waiting for exp backoff
+        self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count)
+        txn.send = Mock(return_value=True)
+        # wait for exp backoff
+        self.clock.advance_time(2000)
+        self.assertEquals(1, txn.send.call_count)
+        self.assertEquals(1, txn.complete.call_count)
+        # 2 because it needs to get None to know there are no more txns
+        self.assertEquals(2, self.store.get_oldest_unsent_txn.call_count)
+        self.callback.assert_called_once_with(self.recoverer)
+        self.assertEquals(self.recoverer.service, self.service)
+
+    def test_recover_retry_txn(self):
+        txn = Mock()
+        txns = [txn, None]
+        pop_txn = False
+
+        def take_txn(*args, **kwargs):
+            if pop_txn:
+                return defer.succeed(txns.pop(0))
+            else:
+                return defer.succeed(txn)
+        self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn)
+
+        self.recoverer.recover()
+        self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count)
+        txn.send = Mock(return_value=False)
+        self.clock.advance_time(2000)
+        self.assertEquals(1, txn.send.call_count)
+        self.assertEquals(0, txn.complete.call_count)
+        self.assertEquals(0, self.callback.call_count)
+        self.clock.advance_time(4000)
+        self.assertEquals(2, txn.send.call_count)
+        self.assertEquals(0, txn.complete.call_count)
+        self.assertEquals(0, self.callback.call_count)
+        self.clock.advance_time(8000)
+        self.assertEquals(3, txn.send.call_count)
+        self.assertEquals(0, txn.complete.call_count)
+        self.assertEquals(0, self.callback.call_count)
+        txn.send = Mock(return_value=True)  # successfully send the txn
+        pop_txn = True  # returns the txn the first time, then no more.
+        self.clock.advance_time(16000)
+        self.assertEquals(1, txn.send.call_count)  # new mock reset call count
+        self.assertEquals(1, txn.complete.call_count)
+        self.callback.assert_called_once_with(self.recoverer)
+
+
+class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.grouper = _EventGrouper()
+
+    def test_drain_single_event(self):
+        service = Mock()
+        event = Mock()
+        self.grouper.on_receive(service, event)
+        groups = self.grouper.drain_groups()
+        self.assertTrue(service in groups)
+        self.assertEquals([event], groups[service])
+        self.assertEquals(1, len(groups.keys()))
+        # no more events
+        self.assertEquals(self.grouper.drain_groups(), {})
+
+    def test_drain_multiple_events(self):
+        service = Mock()
+        events = [Mock(), Mock(), Mock()]
+        for e in events:
+            self.grouper.on_receive(service, e)
+        groups = self.grouper.drain_groups()
+        self.assertTrue(service in groups)
+        self.assertEquals(events, groups[service])
+        # no more events
+        self.assertEquals(self.grouper.drain_groups(), {})
+
+    def test_drain_multiple_services(self):
+        services = [Mock(), Mock(), Mock()]
+        events_a = [Mock(), Mock()]
+        events_b = [Mock()]
+        events_c = [Mock(), Mock(), Mock(), Mock()]
+        mappings = {
+            services[0]: events_a,
+            services[1]: events_b,
+            services[2]: events_c
+        }
+        for e in events_b:
+            self.grouper.on_receive(services[1], e)
+        for e in events_c:
+            self.grouper.on_receive(services[2], e)
+        for e in events_a:
+            self.grouper.on_receive(services[0], e)
+
+        groups = self.grouper.drain_groups()
+        for service in services:
+            self.assertTrue(service in groups)
+            self.assertEquals(mappings[service], groups[service])
+        self.assertEquals(3, len(groups.keys()))
+        # no more events
+        self.assertEquals(self.grouper.drain_groups(), {})