diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 2b3aa3b0ea..50ad3b8e83 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -88,45 +88,6 @@ class AppServiceScheduler(object):
self.event_grouper.on_receive(service, event)
-class AppServiceTransaction(object):
- """Represents an application service transaction."""
-
- def __init__(self, service, id, events):
- self.service = service
- self.id = id
- self.events = events
-
- def send(self, as_api):
- """Sends this transaction using the provided AS API interface.
-
- Args:
- as_api(ApplicationServiceApi): The API to use to send.
- Returns:
- A Deferred which resolves to True if the transaction was sent.
- """
- return as_api.push_bulk(
- service=self.service,
- events=self.events,
- txn_id=self.id
- )
-
- def complete(self, store):
- """Completes this transaction as successful.
-
- Marks this transaction ID on the application service and removes the
- transaction contents from the database.
-
- Args:
- store: The database store to operate on.
- Returns:
- A Deferred which resolves to True if the transaction was completed.
- """
- return store.complete_appservice_txn(
- service=self.service,
- txn_id=self.id
- )
-
-
class _EventGrouper(object):
"""Groups events for the same application service together.
"""
@@ -156,14 +117,18 @@ class _TransactionController(object):
# keep track of how many recoverers there are
self.recoverers = []
+ @defer.inlineCallbacks
def start_polling(self):
groups = self.event_grouper.drain_groups()
for service in groups:
- txn_id = self._get_next_txn_id(service)
- txn = AppServiceTransaction(service, txn_id, groups[service])
- self._store_txn(txn)
- if self._is_service_up(service):
- if txn.send(self.as_api):
+ txn = yield self.store.create_appservice_txn(
+ service=service,
+ events=groups[service]
+ )
+ service_is_up = yield self._is_service_up(service)
+ if service_is_up:
+ sent = yield txn.send(self.as_api)
+ if sent:
txn.complete(self.store)
else:
self._start_recoverer(service)
@@ -207,14 +172,10 @@ class _TransactionController(object):
logger.error("Failed to apply appservice state DOWN to service %s",
service)
+ @defer.inlineCallbacks
def _is_service_up(self, service):
- pass
-
- def _get_next_txn_id(self, service):
- pass # TODO work out the next txn_id for this service
-
- def _store_txn(self, txn):
- pass
+ state = yield self.store.get_appservice_state(service)
+ defer.returnValue(state == ApplicationServiceState.UP)
class _Recoverer(object):
|