summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/appservice/scheduler.py121
1 files changed, 111 insertions, 10 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index a5060808d3..3162fbec11 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -18,7 +18,7 @@ This module controls the reliability for application service transactions.
 The nominal flow through this module looks like:
                     ___________
   \O/ --- event -->|           |                           +--------------+
-   |  - event ---->| EventPool |<-- poll 1/s for events ---|  EventSorter |
+   |  - event ---->| event_pool|<-- poll 1/s for events ---|  EventSorter |
   / \ ---- event ->|___________|                           +--------------+
  USERS                                 ____________________________|
                                       |        |       |
@@ -29,7 +29,7 @@ The nominal flow through this module looks like:
                                       V
       -````````-            +------------+
       |````````|<--StoreTxn-|Transaction |
-      |Database|            |    Maker   |---> SEND TO AS
+      |Database|            | Controller |---> SEND TO AS
       `--------`            +------------+
 What happens on SEND TO AS depends on the state of the Application Service:
  - If the AS is marked as DOWN, do nothing.
@@ -49,20 +49,121 @@ UP & quit           +---------- YES                      SUCCESS
     |               |                                        |
     NO <--- Have more txns? <------ Mark txn success & nuke -+
                                       from db; incr AS pos.
+
+This is all tied together by the AppServiceScheduler which DIs the required
+components.
 """
 
 
-class EventPool(object):
-    pass
+class AppServiceScheduler(object):
+    """ Public facing API for this module. Does the required DI to tie the
+    components together. This also serves as the "event_pool", which in this
+    case is a simple array.
+    """
+
+    def __init__(self, store, as_api, services):
+        self.app_services = services
+        self.event_pool = []
+
+        def create_recoverer(service):
+            return _Recoverer(store, as_api, service)
+        self.txn_ctrl = _TransactionController(store, as_api, create_recoverer)
+
+        self.event_sorter = _EventSorter(self, self.txn_ctrl, services)
+
+    def start(self):
+        self.event_sorter.start_polling()
+
+    def store_event(self, event):  # event_pool
+        self.event_pool.append(event)
+
+    def get_events(self):  # event_pool
+        return self.event_pool
+
+
+class AppServiceTransaction(object):
+    """Represents an application service transaction."""
+
+    def __init__(self, service, id, events):
+        self.service = service
+        self.id = id
+        self.events = events
+
+    def send(self, as_api):
+        # sends this transaction using this as_api
+        pass
+
+    def complete(self, store):
+        # increment txn id on AS and nuke txn contents from db
+        pass
+
+
+class _EventSorter(object):
+
+    def __init__(self, event_pool, txn_ctrl, services):
+        self.event_pool = event_pool
+        self.txn_ctrl = txn_ctrl
+        self.services = services
+
+    def start_polling(self):
+        events = self.event_pool.get_events()
+        if events:
+            self._process(events)
+        # repoll later on
+
+    def _process(self, events):
+        # sort events
+        # f.e. (AS, events) => poke transaction controller
+        pass
+
+
+class _TransactionController(object):
+
+    def __init__(self, store, as_api, recoverer_fn):
+        self.store = store
+        self.as_api = as_api
+        self.recoverer_fn = recoverer_fn
+
+    def on_receive_events(self, service, events):
+        txn = self._store_txn(service, events)
+        if txn.send(self.as_api):
+            txn.complete(self.store)
+        else:
+            self._start_recoverer(service)
+
+    def _start_recoverer(self, service):
+        recoverer = self.recoverer_fn(service)
+        recoverer.recover()
+
+    def _store_txn(self, service, events):
+        pass  # returns AppServiceTransaction
+
+
+class _Recoverer(object):
 
+    def __init__(self, store, as_api, service):
+        self.store = store
+        self.as_api = as_api
+        self.service = service
+        self.backoff_counter = 1
 
-class EventSorter(object):
-    pass
+    def recover(self):
+        # TODO wait a bit
+        txn = self._get_oldest_txn()
+        if txn:
+            if txn.send(self.as_api):
+                txn.complete(self.store)
+            else:
+                self.backoff_counter += 1
+                self.recover(self.service)
+                return
+        else:
+            self._set_service_recovered(self.service)
 
+    def _set_service_recovered(self, service):
+        pass
 
-class TransactionMaker(object):
-    pass
+    def _get_oldest_txn(self):
+        pass  # returns AppServiceTransaction
 
 
-class Recoverer(object):
-    pass