diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 27271e468d..19fe8e11e8 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -16,17 +16,11 @@
This module controls the reliability for application service transactions.
The nominal flow through this module looks like:
- ___________
- \O/ --- event -->| | +--------------+
- | - event ---->| event_pool|<-- poll 1/s for events ---| EventSorter |
- / \ ---- event ->|___________| +--------------+
- USERS ____________________________|
- | | |
- V V V
- ASa ASb ASc
- [e,e] [e] [e,e,e]
- |
- V
+ _________
+---ASa[e]-->| Event |
+----ASb[e]->| Grouper |<-poll 1/s--+
+--ASa[e]--->|_________| | ASa[e,e] ASb[e]
+ V
-````````- +------------+
|````````|<--StoreTxn-|Transaction |
|Database| | Controller |---> SEND TO AS
@@ -43,11 +37,11 @@ Recoverer attempts to recover ASes who have died. The flow for this looks like:
V |
START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
backoff DB and try to send it
- ^ |__________
-Mark AS as | V
-UP & quit +---------- YES SUCCESS
- | | |
- NO <--- Have more txns? <------ Mark txn success & nuke -+
+ ^ |___________
+Mark AS as | V
+UP & quit +---------- YES SUCCESS
+ | | |
+ NO <--- Have more txns? <------ Mark txn success & nuke <-+
from db; incr AS pos.
Reset backoff.
@@ -62,24 +56,28 @@ class AppServiceScheduler(object):
case is a simple array.
"""
- def __init__(self, store, as_api, services):
- self.app_services = services
- self.event_pool = []
+ def __init__(self, clock, store, as_api):
+ self.clock = clock
+ self.store = store
+ self.as_api = as_api
+ self.event_grouper = _EventGrouper()
- def create_recoverer(service):
- return _Recoverer(store, as_api, service)
- self.txn_ctrl = _TransactionController(store, as_api, create_recoverer)
+ def create_recoverer(service, callback):
+ return _Recoverer(clock, store, as_api, service, callback)
- self.event_sorter = _EventSorter(self, self.txn_ctrl, services)
+ self.txn_ctrl = _TransactionController(
+ clock, store, as_api, self.event_grouper, create_recoverer
+ )
def start(self):
- self.event_sorter.start_polling()
-
- def store_event(self, event): # event_pool
- self.event_pool.append(event)
+ # check for any DOWN ASes and start recoverers for them.
+ _Recoverer.start(
+ self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+ )
+ self.txn_ctrl.start_polling()
- def drain_events(self): # event_pool
- return self.event_pool
+ def submit_event_for_as(self, service, event):
+ self.event_grouper.on_receive(service, event)
class AppServiceTransaction(object):
@@ -99,71 +97,99 @@ class AppServiceTransaction(object):
pass
-class _EventSorter(object):
+class _EventGrouper(object):
+ """Groups events for the same application service together.
+ """
- def __init__(self, event_pool, txn_ctrl, services):
- self.event_pool = event_pool
- self.txn_ctrl = txn_ctrl
- self.services = services
+ def __init__(self):
+ self.groups = {} # dict of {service: [events]}
- def start_polling(self):
- events = self.event_pool.drain_events()
- if events:
- self._process(events)
- # TODO repoll later on
-
- def _process(self, events):
- # TODO sort events
- # TODO fe (AS, events) => poke transaction controller on_receive_events
+ def on_receive(self, service, event):
+ # TODO group this
pass
+ def drain_groups(self):
+ return self.groups
+
class _TransactionController(object):
- def __init__(self, store, as_api, recoverer_fn):
+ def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+ self.clock = clock
self.store = store
self.as_api = as_api
+ self.event_grouper = event_grouper
self.recoverer_fn = recoverer_fn
- def on_receive_events(self, service, events):
- txn = self._store_txn(service, events)
- if txn.send(self.as_api):
- txn.complete(self.store)
- else:
- self._start_recoverer(service)
+ def start_polling(self):
+ groups = self.event_grouper.drain_groups()
+ for service in groups:
+ txn_id = self._get_next_txn_id(service)
+ txn = AppServiceTransaction(service, txn_id, groups[service])
+ self._store_txn(txn)
+ if self._is_service_up(service):
+ if txn.send(self.as_api):
+ txn.complete(self.store)
+ else:
+ # TODO mark AS as down
+ self._start_recoverer(service)
+ self.clock.call_later(1000, self.start_polling)
+
+
+ def on_recovered(self, service):
+ # TODO mark AS as UP
+ pass
def _start_recoverer(self, service):
- recoverer = self.recoverer_fn(service)
+ recoverer = self.recoverer_fn(service, self.on_recovered)
recoverer.recover()
- def _store_txn(self, service, events):
- pass # returns AppServiceTransaction
+ def _is_service_up(self, service):
+ pass
+
+ def _get_next_txn_id(self, service):
+ pass # TODO work out the next txn_id for this service
+
+ def _store_txn(self, txn):
+ pass
class _Recoverer(object):
- def __init__(self, store, as_api, service):
+ @staticmethod
+ def start(clock, store, as_api, callback):
+ # TODO check for DOWN ASes and init recoverers
+ pass
+
+ def __init__(self, clock, store, as_api, service, callback):
+ self.clock = clock
self.store = store
self.as_api = as_api
self.service = service
+ self.callback = callback
self.backoff_counter = 1
def recover(self):
- # TODO wait a bit
+ self.clock.call_later(2000 ** self.backoff_counter, self.retry)
+
+ def retry(self):
txn = self._get_oldest_txn()
if txn:
if txn.send(self.as_api):
txn.complete(self.store)
+ # reset the backoff counter and retry immediately
self.backoff_counter = 1
+ self.retry()
+ return
else:
self.backoff_counter += 1
- self.recover(self.service)
+ self.recover()
return
else:
- self._set_service_recovered(self.service)
+ self._set_service_recovered()
- def _set_service_recovered(self, service):
- pass
+ def _set_service_recovered(self):
+ self.callback(self.service)
def _get_oldest_txn(self):
pass # returns AppServiceTransaction
|