diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index f54df9c9a5..645d7bf6b2 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -71,11 +71,13 @@ class AppServiceScheduler(object):
clock, store, as_api, self.event_grouper, create_recoverer
)
+ @defer.inlineCallbacks
def start(self):
# check for any DOWN ASes and start recoverers for them.
- _Recoverer.start(
+ recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
)
+ self.txn_ctrl.add_recoverers(recoverers)
self.txn_ctrl.start_polling()
def submit_event_for_as(self, service, event):
@@ -91,12 +93,34 @@ class AppServiceTransaction(object):
self.events = events
def send(self, as_api):
- # TODO sends this transaction using this as_api
- pass
+ """Sends this transaction using the provided AS API interface.
+
+ Args:
+ as_api(ApplicationServiceApi): The API to use to send.
+ Returns:
+ A Deferred which resolves to True if the transaction was sent.
+ """
+ return as_api.push_bulk(
+ service=self.service,
+ events=self.events,
+ txn_id=self.id
+ )
def complete(self, store):
- # TODO increment txn id on AS and nuke txn contents from db
- pass
+ """Completes this transaction as successful.
+
+ Marks this transaction ID on the application service and removes the
+ transaction contents from the database.
+
+ Args:
+ store: The database store to operate on.
+ Returns:
+ A Deferred which resolves to True if the transaction was completed.
+ """
+ return store.complete_appservice_txn(
+ service=self.service,
+ txn_id=self.id
+ )
class _EventGrouper(object):
@@ -125,6 +149,8 @@ class _TransactionController(object):
self.as_api = as_api
self.event_grouper = event_grouper
self.recoverer_fn = recoverer_fn
+ # keep track of how many recoverers there are
+ self.recoverers = []
def start_polling(self):
groups = self.event_grouper.drain_groups()
@@ -144,6 +170,10 @@ class _TransactionController(object):
# TODO mark AS as UP
pass
+ def add_recoverers(self, recoverers):
+ for r in recoverers:
+ self.recoverers.append(r)
+
def _start_recoverer(self, service):
recoverer = self.recoverer_fn(service, self.on_recovered)
recoverer.recover()
@@ -161,9 +191,15 @@ class _TransactionController(object):
class _Recoverer(object):
@staticmethod
+ @defer.inlineCallbacks
def start(clock, store, as_api, callback):
- # TODO check for DOWN ASes and init recoverers
- pass
+ services = yield store.get_failing_appservices()
+ recoverers = [
+ _Recoverer(clock, store, as_api, s, callback) for s in services
+ ]
+ for r in recoverers:
+ r.recover()
+ defer.returnValue(recoverers)
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index e30265750a..c1762692b9 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -336,3 +336,31 @@ class ApplicationServiceStore(SQLBaseStore):
hs_token=service["hs_token"],
sender=service["sender"]
))
+
+
+class ApplicationServiceTransactionStore(SQLBaseStore):
+
+ def __init__(self, hs):
+ super(ApplicationServiceTransactionStore, self).__init__(hs)
+
+ def get_failing_appservices(self):
+ """Get a list of application services which are down.
+
+ Returns:
+ A Deferred which resolves to a list of ApplicationServices, which
+ may be empty.
+ """
+ pass
+
+ def complete_appservice_txn(self, txn_id, service):
+ """Completes an application service transaction.
+
+ Args:
+ txn_id(str): The transaction ID being completed.
+ service(ApplicationService): The application service which was sent
+ this transaction.
+ Returns:
+ A Deferred which resolves to True if this transaction was completed
+ successfully.
+ """
+ pass
|