summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/scheduler.py50
-rw-r--r--synapse/storage/appservice.py28
2 files changed, 71 insertions, 7 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index f54df9c9a5..645d7bf6b2 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -71,11 +71,13 @@ class AppServiceScheduler(object):
             clock, store, as_api, self.event_grouper, create_recoverer
         )
 
+    @defer.inlineCallbacks
     def start(self):
         # check for any DOWN ASes and start recoverers for them.
-        _Recoverer.start(
+        recoverers = yield _Recoverer.start(
             self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
         )
+        self.txn_ctrl.add_recoverers(recoverers)
         self.txn_ctrl.start_polling()
 
     def submit_event_for_as(self, service, event):
@@ -91,12 +93,34 @@ class AppServiceTransaction(object):
         self.events = events
 
     def send(self, as_api):
-        # TODO sends this transaction using this as_api
-        pass
+        """Sends this transaction using the provided AS API interface.
+
+        Args:
+            as_api(ApplicationServiceApi): The API to use to send.
+        Returns:
+            A Deferred which resolves to True if the transaction was sent.
+        """
+        return as_api.push_bulk(
+            service=self.service,
+            events=self.events,
+            txn_id=self.id
+        )
 
     def complete(self, store):
-        # TODO increment txn id on AS and nuke txn contents from db
-        pass
+        """Completes this transaction as successful.
+
+        Marks this transaction ID on the application service and removes the
+        transaction contents from the database.
+
+        Args:
+            store: The database store to operate on.
+        Returns:
+            A Deferred which resolves to True if the transaction was completed.
+        """
+        return store.complete_appservice_txn(
+            service=self.service,
+            txn_id=self.id
+        )
 
 
 class _EventGrouper(object):
@@ -125,6 +149,8 @@ class _TransactionController(object):
         self.as_api = as_api
         self.event_grouper = event_grouper
         self.recoverer_fn = recoverer_fn
+        # keep track of how many recoverers there are
+        self.recoverers = []
 
     def start_polling(self):
         groups = self.event_grouper.drain_groups()
@@ -144,6 +170,10 @@ class _TransactionController(object):
         # TODO mark AS as UP
         pass
 
+    def add_recoverers(self, recoverers):
+        for r in recoverers:
+            self.recoverers.append(r)
+
     def _start_recoverer(self, service):
         recoverer = self.recoverer_fn(service, self.on_recovered)
         recoverer.recover()
@@ -161,9 +191,15 @@ class _TransactionController(object):
 class _Recoverer(object):
 
     @staticmethod
+    @defer.inlineCallbacks
     def start(clock, store, as_api, callback):
-        # TODO check for DOWN ASes and init recoverers
-        pass
+        services = yield store.get_failing_appservices()
+        recoverers = [
+            _Recoverer(clock, store, as_api, s, callback) for s in services
+        ]
+        for r in recoverers:
+            r.recover()
+        defer.returnValue(recoverers)
 
     def __init__(self, clock, store, as_api, service, callback):
         self.clock = clock
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index e30265750a..c1762692b9 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -336,3 +336,31 @@ class ApplicationServiceStore(SQLBaseStore):
                 hs_token=service["hs_token"],
                 sender=service["sender"]
             ))
+
+
+class ApplicationServiceTransactionStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        super(ApplicationServiceTransactionStore, self).__init__(hs)
+
+    def get_failing_appservices(self):
+        """Get a list of application services which are down.
+
+        Returns:
+            A Deferred which resolves to a list of ApplicationServices, which
+            may be empty.
+        """
+        pass
+
+    def complete_appservice_txn(self, txn_id, service):
+        """Completes an application service transaction.
+
+        Args:
+            txn_id(str): The transaction ID being completed.
+            service(ApplicationService): The application service which was sent
+            this transaction.
+        Returns:
+            A Deferred which resolves to True if this transaction was completed
+            successfully.
+        """
+        pass