diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
new file mode 100644
index 0000000000..add1e3879c
--- /dev/null
+++ b/synapse/appservice/scheduler.py
@@ -0,0 +1,224 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module controls the reliability for application service transactions.
+
+The nominal flow through this module looks like:
+ _________
+---ASa[e]-->| Event |
+----ASb[e]->| Grouper |<-poll 1/s--+
+--ASa[e]--->|_________| | ASa[e,e] ASb[e]
+ V
+ -````````- +------------+
+ |````````|<--StoreTxn-|Transaction |
+ |Database| | Controller |---> SEND TO AS
+ `--------` +------------+
+What happens on SEND TO AS depends on the state of the Application Service:
+ - If the AS is marked as DOWN, do nothing.
+ - If the AS is marked as UP, send the transaction.
+ * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
+ contents from the db.
+ * FAILURE : Marked AS as DOWN and start Recoverer.
+
+Recoverer attempts to recover ASes who have died. The flow for this looks like:
+ ,--------------------- backoff++ --------------.
+ V |
+ START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
+ backoff DB and try to send it
+ ^ |___________
+Mark AS as | V
+UP & quit +---------- YES SUCCESS
+ | | |
+ NO <--- Have more txns? <------ Mark txn success & nuke <-+
+ from db; incr AS pos.
+ Reset backoff.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
+"""
+
+from synapse.appservice import ApplicationServiceState
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AppServiceScheduler(object):
+ """ Public facing API for this module. Does the required DI to tie the
+ components together. This also serves as the "event_pool", which in this
+ case is a simple array.
+ """
+
+ def __init__(self, clock, store, as_api):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+ self.event_grouper = _EventGrouper()
+
+ def create_recoverer(service, callback):
+ return _Recoverer(clock, store, as_api, service, callback)
+
+ self.txn_ctrl = _TransactionController(
+ clock, store, as_api, self.event_grouper, create_recoverer
+ )
+
+ @defer.inlineCallbacks
+ def start(self):
+ logger.info("Starting appservice scheduler")
+ # check for any DOWN ASes and start recoverers for them.
+ recoverers = yield _Recoverer.start(
+ self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+ )
+ self.txn_ctrl.add_recoverers(recoverers)
+ self.txn_ctrl.start_polling()
+
+ def submit_event_for_as(self, service, event):
+ self.event_grouper.on_receive(service, event)
+
+
+class _EventGrouper(object):
+ """Groups events for the same application service together.
+ """
+
+ def __init__(self):
+ self.groups = {} # dict of {service: [events]}
+
+ def on_receive(self, service, event):
+ if service not in self.groups:
+ self.groups[service] = []
+ self.groups[service].append(event)
+
+ def drain_groups(self):
+ groups = self.groups
+ self.groups = {}
+ return groups
+
+
+class _TransactionController(object):
+
+ def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+ self.event_grouper = event_grouper
+ self.recoverer_fn = recoverer_fn
+ # keep track of how many recoverers there are
+ self.recoverers = []
+
+ @defer.inlineCallbacks
+ def start_polling(self):
+ groups = self.event_grouper.drain_groups()
+ for service in groups:
+ txn = yield self.store.create_appservice_txn(
+ service=service,
+ events=groups[service]
+ )
+ service_is_up = yield self._is_service_up(service)
+ if service_is_up:
+ sent = yield txn.send(self.as_api)
+ if sent:
+ txn.complete(self.store)
+ else:
+ self._start_recoverer(service)
+ self.clock.call_later(1, self.start_polling)
+
+ @defer.inlineCallbacks
+ def on_recovered(self, recoverer):
+ self.recoverers.remove(recoverer)
+ logger.info("Successfully recovered application service AS ID %s",
+ recoverer.service.id)
+ logger.info("Remaining active recoverers: %s", len(self.recoverers))
+ yield self.store.set_appservice_state(
+ recoverer.service,
+ ApplicationServiceState.UP
+ )
+
+ def add_recoverers(self, recoverers):
+ for r in recoverers:
+ self.recoverers.append(r)
+ if len(recoverers) > 0:
+ logger.info("New active recoverers: %s", len(self.recoverers))
+
+ @defer.inlineCallbacks
+ def _start_recoverer(self, service):
+ yield self.store.set_appservice_state(
+ service,
+ ApplicationServiceState.DOWN
+ )
+ logger.info(
+ "Application service falling behind. Starting recoverer. AS ID %s",
+ service.id
+ )
+ recoverer = self.recoverer_fn(service, self.on_recovered)
+ self.add_recoverers([recoverer])
+ recoverer.recover()
+
+ @defer.inlineCallbacks
+ def _is_service_up(self, service):
+ state = yield self.store.get_appservice_state(service)
+ defer.returnValue(state == ApplicationServiceState.UP or state is None)
+
+
+class _Recoverer(object):
+
+ @staticmethod
+ @defer.inlineCallbacks
+ def start(clock, store, as_api, callback):
+ services = yield store.get_appservices_by_state(
+ ApplicationServiceState.DOWN
+ )
+ recoverers = [
+ _Recoverer(clock, store, as_api, s, callback) for s in services
+ ]
+ for r in recoverers:
+ logger.info("Starting recoverer for AS ID %s which was marked as "
+ "DOWN", r.service.id)
+ r.recover()
+ defer.returnValue(recoverers)
+
+ def __init__(self, clock, store, as_api, service, callback):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+ self.service = service
+ self.callback = callback
+ self.backoff_counter = 1
+
+ def recover(self):
+ self.clock.call_later((2 ** self.backoff_counter), self.retry)
+
+ @defer.inlineCallbacks
+ def retry(self):
+ txn = yield self.store.get_oldest_unsent_txn(self.service)
+ if txn:
+ logger.info("Retrying transaction %s for AS ID %s",
+ txn.id, txn.service.id)
+ sent = yield txn.send(self.as_api)
+ if sent:
+ yield txn.complete(self.store)
+ # reset the backoff counter and retry immediately
+ self.backoff_counter = 1
+ yield self.retry()
+ else:
+ # cap the backoff to be around 18h => (2^16) = 65536 secs
+ if self.backoff_counter < 16:
+ self.backoff_counter += 1
+ self.recover()
+ else:
+ self._set_service_recovered()
+
+ def _set_service_recovered(self):
+ self.callback(self)
|