summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/scheduler.py153
1 files changed, 90 insertions, 63 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 42a350bff8..9998f822f1 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
         self.store = hs.get_datastore()
         self.as_api = hs.get_application_service_api()
 
-        def create_recoverer(service, callback):
-            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
-
-        self.txn_ctrl = _TransactionController(
-            self.clock, self.store, self.as_api, create_recoverer
-        )
+        self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
         self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
     @defer.inlineCallbacks
     def start(self):
         logger.info("Starting appservice scheduler")
+
         # check for any DOWN ASes and start recoverers for them.
-        recoverers = yield _Recoverer.start(
-            self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+        services = yield self.store.get_appservices_by_state(
+            ApplicationServiceState.DOWN
         )
-        self.txn_ctrl.add_recoverers(recoverers)
+
+        for service in services:
+            self.txn_ctrl.start_recoverer(service)
 
     def submit_event_for_as(self, service, event):
         self.queuer.enqueue(service, event)
 
 
 class _ServiceQueuer(object):
-    """Queues events for the same application service together, sending
-    transactions as soon as possible. Once a transaction is sent successfully,
-    this schedules any other events in the queue to run.
+    """Queue of events waiting to be sent to appservices.
+
+    Groups events into transactions per-appservice, and sends them on to the
+    TransactionController. Makes sure that we only have one transaction in flight per
+    appservice at a given time.
     """
 
     def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
+
+        # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-    def __init__(self, clock, store, as_api, recoverer_fn):
+    """Transaction manager.
+
+    Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
+    if a transaction fails.
+
+    (Note we have only have one of these in the homeserver.)
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+    """
+
+    def __init__(self, clock, store, as_api):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.recoverer_fn = recoverer_fn
-        # keep track of how many recoverers there are
-        self.recoverers = []
+
+        # map from service id to recoverer instance
+        self.recoverers = {}
+
+        # for UTs
+        self.RECOVERER_CLASS = _Recoverer
 
     @defer.inlineCallbacks
     def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
                 if sent:
                     yield txn.complete(self.store)
                 else:
-                    run_in_background(self._start_recoverer, service)
+                    run_in_background(self._on_txn_fail, service)
         except Exception:
             logger.exception("Error creating appservice transaction")
-            run_in_background(self._start_recoverer, service)
+            run_in_background(self._on_txn_fail, service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
-        self.recoverers.remove(recoverer)
         logger.info(
             "Successfully recovered application service AS ID %s", recoverer.service.id
         )
+        self.recoverers.pop(recoverer.service.id)
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
             recoverer.service, ApplicationServiceState.UP
         )
 
-    def add_recoverers(self, recoverers):
-        for r in recoverers:
-            self.recoverers.append(r)
-        if len(recoverers) > 0:
-            logger.info("New active recoverers: %s", len(self.recoverers))
-
     @defer.inlineCallbacks
-    def _start_recoverer(self, service):
+    def _on_txn_fail(self, service):
         try:
             yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
-            logger.info(
-                "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id,
-            )
-            recoverer = self.recoverer_fn(service, self.on_recovered)
-            self.add_recoverers([recoverer])
-            recoverer.recover()
+            self.start_recoverer(service)
         except Exception:
             logger.exception("Error starting AS recoverer")
 
+    def start_recoverer(self, service):
+        """Start a Recoverer for the given service
+
+        Args:
+            service (synapse.appservice.ApplicationService):
+        """
+        logger.info("Starting recoverer for AS ID %s", service.id)
+        assert service.id not in self.recoverers
+        recoverer = self.RECOVERER_CLASS(
+            self.clock, self.store, self.as_api, service, self.on_recovered
+        )
+        self.recoverers[service.id] = recoverer
+        recoverer.recover()
+        logger.info("Now %i active recoverers", len(self.recoverers))
+
     @defer.inlineCallbacks
     def _is_service_up(self, service):
         state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-    @staticmethod
-    @defer.inlineCallbacks
-    def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
-        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
-        for r in recoverers:
-            logger.info(
-                "Starting recoverer for AS ID %s which was marked as " "DOWN",
-                r.service.id,
-            )
-            r.recover()
-        return recoverers
+    """Manages retries and backoff for a DOWN appservice.
+
+    We have one of these for each appservice which is currently considered DOWN.
+
+    Args:
+        clock (synapse.util.Clock):
+        store (synapse.storage.DataStore):
+        as_api (synapse.appservice.api.ApplicationServiceApi):
+        service (synapse.appservice.ApplicationService): the service we are managing
+        callback (callable[_Recoverer]): called once the service recovers.
+    """
 
     def __init__(self, clock, store, as_api, service, callback):
         self.clock = clock
@@ -224,7 +244,9 @@ class _Recoverer(object):
                 "as-recoverer-%s" % (self.service.id,), self.retry
             )
 
-        self.clock.call_later((2 ** self.backoff_counter), _retry)
+        delay = 2 ** self.backoff_counter
+        logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
+        self.clock.call_later(delay, _retry)
 
     def _backoff(self):
         # cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -234,25 +256,30 @@ class _Recoverer(object):
 
     @defer.inlineCallbacks
     def retry(self):
+        logger.info("Starting retries on %s", self.service.id)
         try:
-            txn = yield self.store.get_oldest_unsent_txn(self.service)
-            if txn:
+            while True:
+                txn = yield self.store.get_oldest_unsent_txn(self.service)
+                if not txn:
+                    # nothing left: we're done!
+                    self.callback(self)
+                    return
+
                 logger.info(
                     "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
                 )
                 sent = yield txn.send(self.as_api)
-                if sent:
-                    yield txn.complete(self.store)
-                    # reset the backoff counter and retry immediately
-                    self.backoff_counter = 1
-                    yield self.retry()
-                else:
-                    self._backoff()
-            else:
-                self._set_service_recovered()
-        except Exception as e:
-            logger.exception(e)
-            self._backoff()
-
-    def _set_service_recovered(self):
-        self.callback(self)
+                if not sent:
+                    break
+
+                yield txn.complete(self.store)
+
+                # reset the backoff counter and then process the next transaction
+                self.backoff_counter = 1
+
+        except Exception:
+            logger.exception("Unexpected error running retries")
+
+        # we didn't manage to send all of the transactions before we got an error of
+        # some flavour: reschedule the next retry.
+        self._backoff()