diff --git a/changelog.d/5886.misc b/changelog.d/5886.misc
new file mode 100644
index 0000000000..22adba3d85
--- /dev/null
+++ b/changelog.d/5886.misc
@@ -0,0 +1 @@
+Refactor the Appservice scheduler code.
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 0ae12cbac9..9998f822f1 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()
- def create_recoverer(service, callback):
- return _Recoverer(self.clock, self.store, self.as_api, service, callback)
-
- self.txn_ctrl = _TransactionController(
- self.clock, self.store, self.as_api, create_recoverer
- )
+ self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")
+
# check for any DOWN ASes and start recoverers for them.
- recoverers = yield _Recoverer.start(
- self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+ services = yield self.store.get_appservices_by_state(
+ ApplicationServiceState.DOWN
)
- self.txn_ctrl.add_recoverers(recoverers)
+
+ for service in services:
+ self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
class _ServiceQueuer(object):
- """Queues events for the same application service together, sending
- transactions as soon as possible. Once a transaction is sent successfully,
- this schedules any other events in the queue to run.
+ """Queue of events waiting to be sent to appservices.
+
+ Groups events into transactions per-appservice, and sends them on to the
+ TransactionController. Makes sure that we only have one transaction in flight per
+ appservice at a given time.
"""
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
+
+ # the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
class _TransactionController(object):
- def __init__(self, clock, store, as_api, recoverer_fn):
+ """Transaction manager.
+
+ Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
+ if a transaction fails.
+
+ (Note we have only have one of these in the homeserver.)
+
+ Args:
+ clock (synapse.util.Clock):
+ store (synapse.storage.DataStore):
+ as_api (synapse.appservice.api.ApplicationServiceApi):
+ """
+
+ def __init__(self, clock, store, as_api):
self.clock = clock
self.store = store
self.as_api = as_api
- self.recoverer_fn = recoverer_fn
- # keep track of how many recoverers there are
- self.recoverers = []
+
+ # map from service id to recoverer instance
+ self.recoverers = {}
+
+ # for UTs
+ self.RECOVERER_CLASS = _Recoverer
@defer.inlineCallbacks
def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
- run_in_background(self._start_recoverer, service)
+ run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
- run_in_background(self._start_recoverer, service)
+ run_in_background(self._on_txn_fail, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
- self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
+ self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)
- def add_recoverers(self, recoverers):
- for r in recoverers:
- self.recoverers.append(r)
- if len(recoverers) > 0:
- logger.info("New active recoverers: %s", len(self.recoverers))
-
@defer.inlineCallbacks
- def _start_recoverer(self, service):
+ def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
- logger.info(
- "Application service falling behind. Starting recoverer. AS ID %s",
- service.id,
- )
- recoverer = self.recoverer_fn(service, self.on_recovered)
- self.add_recoverers([recoverer])
- recoverer.recover()
+ self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
+ def start_recoverer(self, service):
+ """Start a Recoverer for the given service
+
+ Args:
+ service (synapse.appservice.ApplicationService):
+ """
+ logger.info("Starting recoverer for AS ID %s", service.id)
+ assert service.id not in self.recoverers
+ recoverer = self.RECOVERER_CLASS(
+ self.clock, self.store, self.as_api, service, self.on_recovered
+ )
+ self.recoverers[service.id] = recoverer
+ recoverer.recover()
+ logger.info("Now %i active recoverers", len(self.recoverers))
+
@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
class _Recoverer(object):
- @staticmethod
- @defer.inlineCallbacks
- def start(clock, store, as_api, callback):
- services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
- recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
- for r in recoverers:
- logger.info(
- "Starting recoverer for AS ID %s which was marked as " "DOWN",
- r.service.id,
- )
- r.recover()
- return recoverers
+ """Manages retries and backoff for a DOWN appservice.
+
+ We have one of these for each appservice which is currently considered DOWN.
+
+ Args:
+ clock (synapse.util.Clock):
+ store (synapse.storage.DataStore):
+ as_api (synapse.appservice.api.ApplicationServiceApi):
+ service (synapse.appservice.ApplicationService): the service we are managing
+ callback (callable[_Recoverer]): called once the service recovers.
+ """
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 04b8c2c07c..52f89d3f83 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -37,11 +37,9 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController(
- clock=self.clock,
- store=self.store,
- as_api=self.as_api,
- recoverer_fn=self.recoverer_fn,
+ clock=self.clock, store=self.store, as_api=self.as_api
)
+ self.txnctrl.RECOVERER_CLASS = self.recoverer_fn
def test_single_service_up_txn_sent(self):
# Test: The AS is up and the txn is successfully sent.
|