summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/__init__.py60
-rw-r--r--synapse/appservice/api.py22
-rw-r--r--synapse/appservice/scheduler.py254
3 files changed, 322 insertions, 14 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index a268a6bcc4..63a18b802b 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -20,6 +20,50 @@ import re
 logger = logging.getLogger(__name__)
 
 
+class ApplicationServiceState(object):
+    DOWN = "down"
+    UP = "up"
+
+
+class AppServiceTransaction(object):
+    """Represents an application service transaction."""
+
+    def __init__(self, service, id, events):
+        self.service = service
+        self.id = id
+        self.events = events
+
+    def send(self, as_api):
+        """Sends this transaction using the provided AS API interface.
+
+        Args:
+            as_api(ApplicationServiceApi): The API to use to send.
+        Returns:
+            A Deferred which resolves to True if the transaction was sent.
+        """
+        return as_api.push_bulk(
+            service=self.service,
+            events=self.events,
+            txn_id=self.id
+        )
+
+    def complete(self, store):
+        """Completes this transaction as successful.
+
+        Marks this transaction ID on the application service and removes the
+        transaction contents from the database.
+
+        Args:
+            store: The database store to operate on.
+        Returns:
+            A Deferred which resolves to True if the transaction was completed.
+        """
+        return store.complete_appservice_txn(
+            service=self.service,
+            txn_id=self.id
+        )
+
+
 class ApplicationService(object):
     """Defines an application service. This definition is mostly what is
     provided to the /register AS API.
@@ -35,13 +79,13 @@ class ApplicationService(object):
     NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
 
     def __init__(self, token, url=None, namespaces=None, hs_token=None,
-                 sender=None, txn_id=None):
+                 sender=None, id=None):
         self.token = token
         self.url = url
         self.hs_token = hs_token
         self.sender = sender
         self.namespaces = self._check_namespaces(namespaces)
-        self.txn_id = txn_id
+        self.id = id
 
     def _check_namespaces(self, namespaces):
         # Sanity check that it is of the form:
@@ -51,7 +95,7 @@ class ApplicationService(object):
         #   rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
         # }
         if not namespaces:
-            return None
+            namespaces = {}
 
         for ns in ApplicationService.NS_LIST:
             if ns not in namespaces:
@@ -155,7 +199,10 @@ class ApplicationService(object):
             return self._matches_user(event, member_list)
 
     def is_interested_in_user(self, user_id):
-        return self._matches_regex(user_id, ApplicationService.NS_USERS)
+        return (
+            self._matches_regex(user_id, ApplicationService.NS_USERS)
+            or user_id == self.sender
+        )
 
     def is_interested_in_alias(self, alias):
         return self._matches_regex(alias, ApplicationService.NS_ALIASES)
@@ -164,7 +211,10 @@ class ApplicationService(object):
         return self._matches_regex(room_id, ApplicationService.NS_ROOMS)
 
     def is_exclusive_user(self, user_id):
-        return self._is_exclusive(ApplicationService.NS_USERS, user_id)
+        return (
+            self._is_exclusive(ApplicationService.NS_USERS, user_id)
+            or user_id == self.sender
+        )
 
     def is_exclusive_alias(self, alias):
         return self._is_exclusive(ApplicationService.NS_ALIASES, alias)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index c2179f8d55..2a9becccb3 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -72,14 +72,19 @@ class ApplicationServiceApi(SimpleHttpClient):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def push_bulk(self, service, events):
+    def push_bulk(self, service, events, txn_id=None):
         events = self._serialize(events)
 
+        if txn_id is None:
+            logger.warning("push_bulk: Missing txn ID sending events to %s",
+                           service.url)
+            txn_id = str(0)
+        txn_id = str(txn_id)
+
         uri = service.url + ("/transactions/%s" %
-                             urllib.quote(str(0)))  # TODO txn_ids
-        response = None
+                             urllib.quote(txn_id))
         try:
-            response = yield self.put_json(
+            yield self.put_json(
                 uri=uri,
                 json_body={
                     "events": events
@@ -87,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient):
                 args={
                     "access_token": service.hs_token
                 })
-            if response:  # just an empty json object
-                # TODO: Mark txn as sent successfully
-                defer.returnValue(True)
+            defer.returnValue(True)
+            return
         except CodeMessageException as e:
             logger.warning("push_bulk to %s received %s", uri, e.code)
         except Exception as ex:
@@ -97,8 +101,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def push(self, service, event):
-        response = yield self.push_bulk(service, [event])
+    def push(self, service, event, txn_id=None):
+        response = yield self.push_bulk(service, [event], txn_id)
         defer.returnValue(response)
 
     def _serialize(self, events):
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
new file mode 100644
index 0000000000..59b0b1f4ac
--- /dev/null
+++ b/synapse/appservice/scheduler.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+This module controls the reliability for application service transactions.
+
+The nominal flow through this module looks like:
+              __________
+1---ASa[e]-->|  Service |--> Queue ASa[f]
+2----ASb[e]->|  Queuer  |
+3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
+                                    V
+      -````````-            +------------+
+      |````````|<--StoreTxn-|Transaction |
+      |Database|            | Controller |---> SEND TO AS
+      `--------`            +------------+
+What happens on SEND TO AS depends on the state of the Application Service:
+ - If the AS is marked as DOWN, do nothing.
+ - If the AS is marked as UP, send the transaction.
+     * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
+                 contents from the db.
+     * FAILURE : Marked AS as DOWN and start Recoverer.
+
+Recoverer attempts to recover ASes who have died. The flow for this looks like:
+                ,--------------------- backoff++ --------------.
+               V                                               |
+  START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
+             backoff           DB and try to send it
+                                 ^                |___________
+Mark AS as                       |                            V
+UP & quit           +---------- YES                       SUCCESS
+    |               |                                         |
+    NO <--- Have more txns? <------ Mark txn success & nuke <-+
+                                      from db; incr AS pos.
+                                         Reset backoff.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
+"""
+
+from synapse.appservice import ApplicationServiceState
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class AppServiceScheduler(object):
+    """ Public facing API for this module. Does the required DI to tie the
+    components together. This also serves as the "event_pool", which in this
+    case is a simple array.
+    """
+
+    def __init__(self, clock, store, as_api):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+
+        def create_recoverer(service, callback):
+            return _Recoverer(clock, store, as_api, service, callback)
+
+        self.txn_ctrl = _TransactionController(
+            clock, store, as_api, create_recoverer
+        )
+        self.queuer = _ServiceQueuer(self.txn_ctrl)
+
+    @defer.inlineCallbacks
+    def start(self):
+        logger.info("Starting appservice scheduler")
+        # check for any DOWN ASes and start recoverers for them.
+        recoverers = yield _Recoverer.start(
+            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        )
+        self.txn_ctrl.add_recoverers(recoverers)
+
+    def submit_event_for_as(self, service, event):
+        self.queuer.enqueue(service, event)
+
+
+class _ServiceQueuer(object):
+    """Queues events for the same application service together, sending
+    transactions as soon as possible. Once a transaction is sent successfully,
+    this schedules any other events in the queue to run.
+    """
+
+    def __init__(self, txn_ctrl):
+        self.queued_events = {}  # dict of {service_id: [events]}
+        self.pending_requests = {}  # dict of {service_id: Deferred}
+        self.txn_ctrl = txn_ctrl
+
+    def enqueue(self, service, event):
+        # if this service isn't being sent something
+        if not self.pending_requests.get(service.id):
+            self._send_request(service, [event])
+        else:
+            # add to queue for this service
+            if service.id not in self.queued_events:
+                self.queued_events[service.id] = []
+            self.queued_events[service.id].append(event)
+
+    def _send_request(self, service, events):
+        # send request and add callbacks
+        d = self.txn_ctrl.send(service, events)
+        d.addBoth(self._on_request_finish)
+        d.addErrback(self._on_request_fail)
+        self.pending_requests[service.id] = d
+
+    def _on_request_finish(self, service):
+        self.pending_requests[service.id] = None
+        # if there are queued events, then send them.
+        if (service.id in self.queued_events
+                and len(self.queued_events[service.id]) > 0):
+            self._send_request(service, self.queued_events[service.id])
+            self.queued_events[service.id] = []
+
+    def _on_request_fail(self, err):
+        logger.error("AS request failed: %s", err)
+
+
+class _TransactionController(object):
+
+    def __init__(self, clock, store, as_api, recoverer_fn):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.recoverer_fn = recoverer_fn
+        # keep track of how many recoverers there are
+        self.recoverers = []
+
+    @defer.inlineCallbacks
+    def send(self, service, events):
+        try:
+            txn = yield self.store.create_appservice_txn(
+                service=service,
+                events=events
+            )
+            service_is_up = yield self._is_service_up(service)
+            if service_is_up:
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    txn.complete(self.store)
+                else:
+                    self._start_recoverer(service)
+        except Exception as e:
+            logger.exception(e)
+            self._start_recoverer(service)
+        # request has finished
+        defer.returnValue(service)
+
+    @defer.inlineCallbacks
+    def on_recovered(self, recoverer):
+        self.recoverers.remove(recoverer)
+        logger.info("Successfully recovered application service AS ID %s",
+                    recoverer.service.id)
+        logger.info("Remaining active recoverers: %s", len(self.recoverers))
+        yield self.store.set_appservice_state(
+            recoverer.service,
+            ApplicationServiceState.UP
+        )
+
+    def add_recoverers(self, recoverers):
+        for r in recoverers:
+            self.recoverers.append(r)
+        if len(recoverers) > 0:
+            logger.info("New active recoverers: %s", len(self.recoverers))
+
+    @defer.inlineCallbacks
+    def _start_recoverer(self, service):
+        yield self.store.set_appservice_state(
+            service,
+            ApplicationServiceState.DOWN
+        )
+        logger.info(
+            "Application service falling behind. Starting recoverer. AS ID %s",
+            service.id
+        )
+        recoverer = self.recoverer_fn(service, self.on_recovered)
+        self.add_recoverers([recoverer])
+        recoverer.recover()
+
+    @defer.inlineCallbacks
+    def _is_service_up(self, service):
+        state = yield self.store.get_appservice_state(service)
+        defer.returnValue(state == ApplicationServiceState.UP or state is None)
+
+
+class _Recoverer(object):
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def start(clock, store, as_api, callback):
+        services = yield store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
+        )
+        recoverers = [
+            _Recoverer(clock, store, as_api, s, callback) for s in services
+        ]
+        for r in recoverers:
+            logger.info("Starting recoverer for AS ID %s which was marked as "
+                        "DOWN", r.service.id)
+            r.recover()
+        defer.returnValue(recoverers)
+
+    def __init__(self, clock, store, as_api, service, callback):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.service = service
+        self.callback = callback
+        self.backoff_counter = 1
+
+    def recover(self):
+        self.clock.call_later((2 ** self.backoff_counter), self.retry)
+
+    def _backoff(self):
+        # cap the backoff to be around 18h => (2^16) = 65536 secs
+        if self.backoff_counter < 16:
+            self.backoff_counter += 1
+        self.recover()
+
+    @defer.inlineCallbacks
+    def retry(self):
+        try:
+            txn = yield self.store.get_oldest_unsent_txn(self.service)
+            if txn:
+                logger.info("Retrying transaction %s for AS ID %s",
+                            txn.id, txn.service.id)
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    yield txn.complete(self.store)
+                    # reset the backoff counter and retry immediately
+                    self.backoff_counter = 1
+                    yield self.retry()
+                else:
+                    self._backoff()
+            else:
+                self._set_service_recovered()
+        except Exception as e:
+            logger.exception(e)
+            self._backoff()
+
+    def _set_service_recovered(self):
+        self.callback(self)