summary refs log tree commit diff
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-03-10 10:04:20 +0000
committerKegan Dougal <kegan@matrix.org>2015-03-10 10:04:20 +0000
commitdb1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c (patch)
tree41ebbc0487f24ec3af9210ef1e0633a35d8ee43c
parentRemove unused import (diff)
downloadsynapse-db1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c.tar.xz
Fix remaining scheduler bugs. Add more informative logging.
Diffstat (limited to '')
-rw-r--r--synapse/appservice/api.py8
-rw-r--r--synapse/appservice/scheduler.py52
-rw-r--r--synapse/storage/appservice.py5
3 files changed, 28 insertions, 37 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 3acb8867a2..2a9becccb3 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient):
 
         uri = service.url + ("/transactions/%s" %
                              urllib.quote(txn_id))
-        response = None
         try:
-            response = yield self.put_json(
+            yield self.put_json(
                 uri=uri,
                 json_body={
                     "events": events
@@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient):
                 args={
                     "access_token": service.hs_token
                 })
-            if response:  # just an empty json object
-                # TODO: Mark txn as sent successfully
-                defer.returnValue(True)
+            defer.returnValue(True)
+            return
         except CodeMessageException as e:
             logger.warning("push_bulk to %s received %s", uri, e.code)
         except Exception as ex:
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 3ee2406463..add1e3879c 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -77,6 +77,7 @@ class AppServiceScheduler(object):
 
     @defer.inlineCallbacks
     def start(self):
+        logger.info("Starting appservice scheduler")
         # check for any DOWN ASes and start recoverers for them.
         recoverers = yield _Recoverer.start(
             self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
@@ -137,40 +138,33 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
         self.recoverers.remove(recoverer)
-        logger.info("Successfully recovered application service: %s",
-                    recoverer.service)
-        logger.info("Active recoverers: %s", len(self.recoverers))
-        applied_state = yield self.store.set_appservice_state(
+        logger.info("Successfully recovered application service AS ID %s",
+                    recoverer.service.id)
+        logger.info("Remaining active recoverers: %s", len(self.recoverers))
+        yield self.store.set_appservice_state(
             recoverer.service,
             ApplicationServiceState.UP
         )
-        if not applied_state:
-            logger.error("Failed to apply appservice state UP to service %s",
-                         recoverer.service)
 
     def add_recoverers(self, recoverers):
         for r in recoverers:
             self.recoverers.append(r)
         if len(recoverers) > 0:
-            logger.info("Active recoverers: %s", len(self.recoverers))
+            logger.info("New active recoverers: %s", len(self.recoverers))
 
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
-        applied_state = yield self.store.set_appservice_state(
+        yield self.store.set_appservice_state(
             service,
             ApplicationServiceState.DOWN
         )
-        if applied_state:
-            logger.info(
-                "Application service falling behind. Starting recoverer. %s",
-                service
-            )
-            recoverer = self.recoverer_fn(service, self.on_recovered)
-            self.add_recoverers([recoverer])
-            recoverer.recover()
-        else:
-            logger.error("Failed to apply appservice state DOWN to service %s",
-                         service)
+        logger.info(
+            "Application service falling behind. Starting recoverer. AS ID %s",
+            service.id
+        )
+        recoverer = self.recoverer_fn(service, self.on_recovered)
+        self.add_recoverers([recoverer])
+        recoverer.recover()
 
     @defer.inlineCallbacks
     def _is_service_up(self, service):
@@ -190,6 +184,8 @@ class _Recoverer(object):
             _Recoverer(clock, store, as_api, s, callback) for s in services
         ]
         for r in recoverers:
+            logger.info("Starting recoverer for AS ID %s which was marked as "
+                        "DOWN", r.service.id)
             r.recover()
         defer.returnValue(recoverers)
 
@@ -206,12 +202,13 @@ class _Recoverer(object):
 
     @defer.inlineCallbacks
     def retry(self):
-        txn = yield self._get_oldest_txn()
+        txn = yield self.store.get_oldest_unsent_txn(self.service)
         if txn:
-            logger.info("Retrying transaction %s for service %s",
-                        txn.id, txn.service)
-            if txn.send(self.as_api):
-                txn.complete(self.store)
+            logger.info("Retrying transaction %s for AS ID %s",
+                        txn.id, txn.service.id)
+            sent = yield txn.send(self.as_api)
+            if sent:
+                yield txn.complete(self.store)
                 # reset the backoff counter and retry immediately
                 self.backoff_counter = 1
                 yield self.retry()
@@ -225,8 +222,3 @@ class _Recoverer(object):
 
     def _set_service_recovered(self):
         self.callback(self)
-
-    @defer.inlineCallbacks
-    def _get_oldest_txn(self):
-        txn = yield self.store.get_oldest_unsent_txn(self.service)
-        defer.returnValue(txn)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index fe347dfd3c..c4b4f56c5d 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore):
         services = {}
         for res in results:
             as_token = res["token"]
+            if as_token is None:
+                continue
             if as_token not in services:
                 # add the service
                 services[as_token] = {
@@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         # Monotonically increasing txn ids, so just select the smallest
         # one in the txns table (we delete them when they are sent)
         result = txn.execute(
-            "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
+            "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
             (service.id,)
         )
         entry = self.cursor_to_dict(result)[0]
-
         if not entry or entry["txn_id"] is None:
             # the min(txn_id) part will force a row, so entry may not be None
             return None