summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/scheduler.py110
1 files changed, 65 insertions, 45 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 42a350bff8..03a14402c5 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
         self.store = hs.get_datastore()
         self.as_api = hs.get_application_service_api()
 
-        def create_recoverer(service, callback):
-            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
-
-        self.txn_ctrl = _TransactionController(
-            self.clock, self.store, self.as_api, create_recoverer
-        )
+        self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
         self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
     @defer.inlineCallbacks
     def start(self):
         logger.info("Starting appservice scheduler")
+
         # check for any DOWN ASes and start recoverers for them.
-        recoverers = yield _Recoverer.start(
-            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        services = yield self.store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
         )
-        self.txn_ctrl.add_recoverers(recoverers)
+
+        for service in services:
+            self.txn_ctrl.start_recoverer(service)
 
     def submit_event_for_as(self, service, event):
         self.queuer.enqueue(service, event)
 
 
 class _ServiceQueuer(object):
-    """Queues events for the same application service together, sending
-    transactions as soon as possible. Once a transaction is sent successfully,
-    this schedules any other events in the queue to run.
+    """Queue of events waiting to be sent to appservices.
+
+    Groups events into transactions per-appservice, and sends them on to the
+    TransactionController. Makes sure that we only have one transaction in flight per
+    appservice at a given time.
     """
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+
+        # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-    def __init__(self, clock, store, as_api, recoverer_fn):
+    """Transaction manager.
+
+    Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
+    if a transaction fails.
+
+    (Note we have only have one of these in the homeserver.)
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+    """
+
+    def __init__(self, clock, store, as_api):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.recoverer_fn = recoverer_fn
-        # keep track of how many recoverers there are
-        self.recoverers = []
+
+        # map from service id to recoverer instance
+        self.recoverers = {}
+
+        # for UTs
+        self.RECOVERER_CLASS = _Recoverer
 
     @defer.inlineCallbacks
     def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    run_in_background(self._start_recoverer, service)
+                    run_in_background(self._on_txn_fail, service)
         except Exception:
             logger.exception("Error creating appservice transaction")
-            run_in_background(self._start_recoverer, service)
+            run_in_background(self._on_txn_fail, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
-        self.recoverers.remove(recoverer)
         logger.info(
             "Successfully recovered application service AS ID %s", recoverer.service.id
         )
+        self.recoverers.pop(recoverer.service.id)
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
             recoverer.service, ApplicationServiceState.UP
         )
 
-    def add_recoverers(self, recoverers):
-        for r in recoverers:
-            self.recoverers.append(r)
-        if len(recoverers) > 0:
-            logger.info("New active recoverers: %s", len(self.recoverers))
-
     @defer.inlineCallbacks
-    def _start_recoverer(self, service):
+    def _on_txn_fail(self, service):
         try:
             yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
-            logger.info(
-                "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id,
-            )
-            recoverer = self.recoverer_fn(service, self.on_recovered)
-            self.add_recoverers([recoverer])
-            recoverer.recover()
+            self.start_recoverer(service)
         except Exception:
             logger.exception("Error starting AS recoverer")
 
+    def start_recoverer(self, service):
+        """Start a Recoverer for the given service
+
+        Args:
+            service (synapse.appservice.ApplicationService):
+        """
+        logger.info("Starting recoverer for AS ID %s", service.id)
+        assert service.id not in self.recoverers
+        recoverer = self.RECOVERER_CLASS(
+            self.clock, self.store, self.as_api, service, self.on_recovered
+        )
+        self.recoverers[service.id] = recoverer
+        recoverer.recover()
+        logger.info("Now %i active recoverers", len(self.recoverers))
+
     @defer.inlineCallbacks
     def _is_service_up(self, service):
         state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-    @staticmethod
-    @defer.inlineCallbacks
-    def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
-        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
-        for r in recoverers:
-            logger.info(
-                "Starting recoverer for AS ID %s which was marked as " "DOWN",
-                r.service.id,
-            )
-            r.recover()
-        return recoverers
+    """Manages retries and backoff for a DOWN appservice.
+
+    We have one of these for each appservice which is currently considered DOWN.
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+        service (synapse.appservice.ApplicationService): the service we are managing
+        callback (callable[_Recoverer]): called once the service recovers.
+    """
 
     def __init__(self, clock, store, as_api, service, callback):
         self.clock = clock