summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py224
1 files changed, 224 insertions, 0 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
new file mode 100644
index 0000000000..add1e3879c
--- /dev/null
+++ b/synapse/appservice/scheduler.py
@@ -0,0 +1,224 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module controls the reliability for application service transactions.
+
+The nominal flow through this module looks like:
+             _________
+---ASa[e]-->|  Event  |
+----ASb[e]->| Grouper |<-poll 1/s--+
+--ASa[e]--->|_________|            | ASa[e,e]  ASb[e]
+                                   V
+      -````````-            +------------+
+      |````````|<--StoreTxn-|Transaction |
+      |Database|            | Controller |---> SEND TO AS
+      `--------`            +------------+
+What happens on SEND TO AS depends on the state of the Application Service:
+ - If the AS is marked as DOWN, do nothing.
+ - If the AS is marked as UP, send the transaction.
+     * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
+                 contents from the db.
+     * FAILURE : Marked AS as DOWN and start Recoverer.
+
+Recoverer attempts to recover ASes who have died. The flow for this looks like:
+                ,--------------------- backoff++ --------------.
+               V                                               |
+  START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
+             backoff           DB and try to send it
+                                 ^                |___________
+Mark AS as                       |                            V
+UP & quit           +---------- YES                       SUCCESS
+    |               |                                         |
+    NO <--- Have more txns? <------ Mark txn success & nuke <-+
+                                      from db; incr AS pos.
+                                         Reset backoff.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
+"""
+
+from synapse.appservice import ApplicationServiceState
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AppServiceScheduler(object):
+    """ Public facing API for this module. Does the required DI to tie the
+    components together. This also serves as the "event_pool", which in this
+    case is a simple array.
+    """
+
+    def __init__(self, clock, store, as_api):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.event_grouper = _EventGrouper()
+
+        def create_recoverer(service, callback):
+            return _Recoverer(clock, store, as_api, service, callback)
+
+        self.txn_ctrl = _TransactionController(
+            clock, store, as_api, self.event_grouper, create_recoverer
+        )
+
+    @defer.inlineCallbacks
+    def start(self):
+        logger.info("Starting appservice scheduler")
+        # check for any DOWN ASes and start recoverers for them.
+        recoverers = yield _Recoverer.start(
+            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        )
+        self.txn_ctrl.add_recoverers(recoverers)
+        self.txn_ctrl.start_polling()
+
+    def submit_event_for_as(self, service, event):
+        self.event_grouper.on_receive(service, event)
+
+
+class _EventGrouper(object):
+    """Groups events for the same application service together.
+    """
+
+    def __init__(self):
+        self.groups = {}  # dict of {service: [events]}
+
+    def on_receive(self, service, event):
+        if service not in self.groups:
+            self.groups[service] = []
+        self.groups[service].append(event)
+
+    def drain_groups(self):
+        groups = self.groups
+        self.groups = {}
+        return groups
+
+
+class _TransactionController(object):
+
+    def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.event_grouper = event_grouper
+        self.recoverer_fn = recoverer_fn
+        # keep track of how many recoverers there are
+        self.recoverers = []
+
+    @defer.inlineCallbacks
+    def start_polling(self):
+        groups = self.event_grouper.drain_groups()
+        for service in groups:
+            txn = yield self.store.create_appservice_txn(
+                service=service,
+                events=groups[service]
+            )
+            service_is_up = yield self._is_service_up(service)
+            if service_is_up:
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    txn.complete(self.store)
+                else:
+                    self._start_recoverer(service)
+        self.clock.call_later(1, self.start_polling)
+
+    @defer.inlineCallbacks
+    def on_recovered(self, recoverer):
+        self.recoverers.remove(recoverer)
+        logger.info("Successfully recovered application service AS ID %s",
+                    recoverer.service.id)
+        logger.info("Remaining active recoverers: %s", len(self.recoverers))
+        yield self.store.set_appservice_state(
+            recoverer.service,
+            ApplicationServiceState.UP
+        )
+
+    def add_recoverers(self, recoverers):
+        for r in recoverers:
+            self.recoverers.append(r)
+        if len(recoverers) > 0:
+            logger.info("New active recoverers: %s", len(self.recoverers))
+
+    @defer.inlineCallbacks
+    def _start_recoverer(self, service):
+        yield self.store.set_appservice_state(
+            service,
+            ApplicationServiceState.DOWN
+        )
+        logger.info(
+            "Application service falling behind. Starting recoverer. AS ID %s",
+            service.id
+        )
+        recoverer = self.recoverer_fn(service, self.on_recovered)
+        self.add_recoverers([recoverer])
+        recoverer.recover()
+
+    @defer.inlineCallbacks
+    def _is_service_up(self, service):
+        state = yield self.store.get_appservice_state(service)
+        defer.returnValue(state == ApplicationServiceState.UP or state is None)
+
+
+class _Recoverer(object):
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def start(clock, store, as_api, callback):
+        services = yield store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
+        )
+        recoverers = [
+            _Recoverer(clock, store, as_api, s, callback) for s in services
+        ]
+        for r in recoverers:
+            logger.info("Starting recoverer for AS ID %s which was marked as "
+                        "DOWN", r.service.id)
+            r.recover()
+        defer.returnValue(recoverers)
+
+    def __init__(self, clock, store, as_api, service, callback):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.service = service
+        self.callback = callback
+        self.backoff_counter = 1
+
+    def recover(self):
+        self.clock.call_later((2 ** self.backoff_counter), self.retry)
+
+    @defer.inlineCallbacks
+    def retry(self):
+        txn = yield self.store.get_oldest_unsent_txn(self.service)
+        if txn:
+            logger.info("Retrying transaction %s for AS ID %s",
+                        txn.id, txn.service.id)
+            sent = yield txn.send(self.as_api)
+            if sent:
+                yield txn.complete(self.store)
+                # reset the backoff counter and retry immediately
+                self.backoff_counter = 1
+                yield self.retry()
+            else:
+                # cap the backoff to be around 18h => (2^16) = 65536 secs
+                if self.backoff_counter < 16:
+                    self.backoff_counter += 1
+                self.recover()
+        else:
+            self._set_service_recovered()
+
+    def _set_service_recovered(self):
+        self.callback(self)