diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index f54df9c9a5..645d7bf6b2 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -71,11 +71,13 @@ class AppServiceScheduler(object):
clock, store, as_api, self.event_grouper, create_recoverer
)
+ @defer.inlineCallbacks
def start(self):
# check for any DOWN ASes and start recoverers for them.
- _Recoverer.start(
+ recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
)
+ self.txn_ctrl.add_recoverers(recoverers)
self.txn_ctrl.start_polling()
def submit_event_for_as(self, service, event):
@@ -91,12 +93,34 @@ class AppServiceTransaction(object):
self.events = events
def send(self, as_api):
- # TODO sends this transaction using this as_api
- pass
+ """Sends this transaction using the provided AS API interface.
+
+ Args:
+ as_api(ApplicationServiceApi): The API to use to send.
+ Returns:
+ A Deferred which resolves to True if the transaction was sent.
+ """
+ return as_api.push_bulk(
+ service=self.service,
+ events=self.events,
+ txn_id=self.id
+ )
def complete(self, store):
- # TODO increment txn id on AS and nuke txn contents from db
- pass
+ """Completes this transaction as successful.
+
+ Marks this transaction ID on the application service and removes the
+ transaction contents from the database.
+
+ Args:
+ store: The database store to operate on.
+ Returns:
+ A Deferred which resolves to True if the transaction was completed.
+ """
+ return store.complete_appservice_txn(
+ service=self.service,
+ txn_id=self.id
+ )
class _EventGrouper(object):
@@ -125,6 +149,8 @@ class _TransactionController(object):
self.as_api = as_api
self.event_grouper = event_grouper
self.recoverer_fn = recoverer_fn
+ # keep track of how many recoverers there are
+ self.recoverers = []
def start_polling(self):
groups = self.event_grouper.drain_groups()
@@ -144,6 +170,10 @@ class _TransactionController(object):
# TODO mark AS as UP
pass
+ def add_recoverers(self, recoverers):
+ for r in recoverers:
+ self.recoverers.append(r)
+
def _start_recoverer(self, service):
recoverer = self.recoverer_fn(service, self.on_recovered)
recoverer.recover()
@@ -161,9 +191,15 @@ class _TransactionController(object):
class _Recoverer(object):
@staticmethod
+ @defer.inlineCallbacks
def start(clock, store, as_api, callback):
- # TODO check for DOWN ASes and init recoverers
- pass
+ services = yield store.get_failing_appservices()
+ recoverers = [
+ _Recoverer(clock, store, as_api, s, callback) for s in services
+ ]
+ for r in recoverers:
+ r.recover()
+ defer.returnValue(recoverers)
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
|