summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/appservice/scheduler.py144
1 files changed, 85 insertions, 59 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 27271e468d..19fe8e11e8 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -16,17 +16,11 @@
 This module controls the reliability for application service transactions.
 
 The nominal flow through this module looks like:
-                    ___________
-  \O/ --- event -->|           |                           +--------------+
-   |  - event ---->| event_pool|<-- poll 1/s for events ---|  EventSorter |
-  / \ ---- event ->|___________|                           +--------------+
- USERS                                 ____________________________|
-                                      |        |       |
-                                      V        V       V
-                                     ASa       ASb     ASc
-                                    [e,e]      [e]   [e,e,e]
-                                      |
-                                      V
+             _________
+---ASa[e]-->|  Event  |
+----ASb[e]->| Grouper |<-poll 1/s--+
+--ASa[e]--->|_________|            | ASa[e,e]  ASb[e]
+                                   V
       -````````-            +------------+
       |````````|<--StoreTxn-|Transaction |
       |Database|            | Controller |---> SEND TO AS
@@ -43,11 +37,11 @@ Recoverer attempts to recover ASes who have died. The flow for this looks like:
                V                                               |
   START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
              backoff           DB and try to send it
-                                 ^                |__________
-Mark AS as                       |                           V
-UP & quit           +---------- YES                      SUCCESS
-    |               |                                        |
-    NO <--- Have more txns? <------ Mark txn success & nuke -+
+                                 ^                |___________
+Mark AS as                       |                            V
+UP & quit           +---------- YES                       SUCCESS
+    |               |                                         |
+    NO <--- Have more txns? <------ Mark txn success & nuke <-+
                                       from db; incr AS pos.
                                          Reset backoff.
 
@@ -62,24 +56,28 @@ class AppServiceScheduler(object):
     case is a simple array.
     """
 
-    def __init__(self, store, as_api, services):
-        self.app_services = services
-        self.event_pool = []
+    def __init__(self, clock, store, as_api):
+        self.clock = clock
+        self.store = store
+        self.as_api = as_api
+        self.event_grouper = _EventGrouper()
 
-        def create_recoverer(service):
-            return _Recoverer(store, as_api, service)
-        self.txn_ctrl = _TransactionController(store, as_api, create_recoverer)
+        def create_recoverer(service, callback):
+            return _Recoverer(clock, store, as_api, service, callback)
 
-        self.event_sorter = _EventSorter(self, self.txn_ctrl, services)
+        self.txn_ctrl = _TransactionController(
+            clock, store, as_api, self.event_grouper, create_recoverer
+        )
 
     def start(self):
-        self.event_sorter.start_polling()
-
-    def store_event(self, event):  # event_pool
-        self.event_pool.append(event)
+        # check for any DOWN ASes and start recoverers for them.
+        _Recoverer.start(
+            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        )
+        self.txn_ctrl.start_polling()
 
-    def drain_events(self):  # event_pool
-        return self.event_pool
+    def submit_event_for_as(self, service, event):
+        self.event_grouper.on_receive(service, event)
 
 
 class AppServiceTransaction(object):
@@ -99,71 +97,99 @@ class AppServiceTransaction(object):
         pass
 
 
-class _EventSorter(object):
+class _EventGrouper(object):
+    """Groups events for the same application service together.
+    """
 
-    def __init__(self, event_pool, txn_ctrl, services):
-        self.event_pool = event_pool
-        self.txn_ctrl = txn_ctrl
-        self.services = services
+    def __init__(self):
+        self.groups = {}  # dict of {service: [events]}
 
-    def start_polling(self):
-        events = self.event_pool.drain_events()
-        if events:
-            self._process(events)
-        # TODO repoll later on
-
-    def _process(self, events):
-        # TODO sort events
-        # TODO fe (AS, events) => poke transaction controller on_receive_events
+    def on_receive(self, service, event):
+        # TODO group this
         pass
 
+    def drain_groups(self):
+        return self.groups
+
 
 class _TransactionController(object):
 
-    def __init__(self, store, as_api, recoverer_fn):
+    def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+        self.clock = clock
         self.store = store
         self.as_api = as_api
+        self.event_grouper = event_grouper
         self.recoverer_fn = recoverer_fn
 
-    def on_receive_events(self, service, events):
-        txn = self._store_txn(service, events)
-        if txn.send(self.as_api):
-            txn.complete(self.store)
-        else:
-            self._start_recoverer(service)
+    def start_polling(self):
+        groups = self.event_grouper.drain_groups()
+        for service in groups:
+            txn_id = self._get_next_txn_id(service)
+            txn = AppServiceTransaction(service, txn_id, groups[service])
+            self._store_txn(txn)
+            if self._is_service_up(service):
+                if txn.send(self.as_api):
+                    txn.complete(self.store)
+                else:
+                    # TODO mark AS as down
+                    self._start_recoverer(service)
+        self.clock.call_later(1000, self.start_polling)
+
+
+    def on_recovered(self, service):
+        # TODO mark AS as UP
+        pass
 
     def _start_recoverer(self, service):
-        recoverer = self.recoverer_fn(service)
+        recoverer = self.recoverer_fn(service, self.on_recovered)
         recoverer.recover()
 
-    def _store_txn(self, service, events):
-        pass  # returns AppServiceTransaction
+    def _is_service_up(self, service):
+        pass
+
+    def _get_next_txn_id(self, service):
+        pass  # TODO work out the next txn_id for this service
+
+    def _store_txn(self, txn):
+        pass
 
 
 class _Recoverer(object):
 
-    def __init__(self, store, as_api, service):
+    @staticmethod
+    def start(clock, store, as_api, callback):
+        # TODO check for DOWN ASes and init recoverers
+        pass
+
+    def __init__(self, clock, store, as_api, service, callback):
+        self.clock = clock
         self.store = store
         self.as_api = as_api
         self.service = service
+        self.callback = callback
         self.backoff_counter = 1
 
     def recover(self):
-        # TODO wait a bit
+        self.clock.call_later(2000 ** self.backoff_counter, self.retry)
+
+    def retry(self):
         txn = self._get_oldest_txn()
         if txn:
             if txn.send(self.as_api):
                 txn.complete(self.store)
+                # reset the backoff counter and retry immediately
                 self.backoff_counter = 1
+                self.retry()
+                return
             else:
                 self.backoff_counter += 1
-                self.recover(self.service)
+                self.recover()
                 return
         else:
-            self._set_service_recovered(self.service)
+            self._set_service_recovered()
 
-    def _set_service_recovered(self, service):
-        pass
+    def _set_service_recovered(self):
+        self.callback(self.service)
 
     def _get_oldest_txn(self):
         pass  # returns AppServiceTransaction