diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 3acb8867a2..2a9becccb3 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/transactions/%s" %
urllib.quote(txn_id))
- response = None
try:
- response = yield self.put_json(
+ yield self.put_json(
uri=uri,
json_body={
"events": events
@@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient):
args={
"access_token": service.hs_token
})
- if response: # just an empty json object
- # TODO: Mark txn as sent successfully
- defer.returnValue(True)
+ defer.returnValue(True)
+ return
except CodeMessageException as e:
logger.warning("push_bulk to %s received %s", uri, e.code)
except Exception as ex:
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 3ee2406463..add1e3879c 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -77,6 +77,7 @@ class AppServiceScheduler(object):
@defer.inlineCallbacks
def start(self):
+ logger.info("Starting appservice scheduler")
# check for any DOWN ASes and start recoverers for them.
recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
@@ -137,40 +138,33 @@ class _TransactionController(object):
@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
- logger.info("Successfully recovered application service: %s",
- recoverer.service)
- logger.info("Active recoverers: %s", len(self.recoverers))
- applied_state = yield self.store.set_appservice_state(
+ logger.info("Successfully recovered application service AS ID %s",
+ recoverer.service.id)
+ logger.info("Remaining active recoverers: %s", len(self.recoverers))
+ yield self.store.set_appservice_state(
recoverer.service,
ApplicationServiceState.UP
)
- if not applied_state:
- logger.error("Failed to apply appservice state UP to service %s",
- recoverer.service)
def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
- logger.info("Active recoverers: %s", len(self.recoverers))
+ logger.info("New active recoverers: %s", len(self.recoverers))
@defer.inlineCallbacks
def _start_recoverer(self, service):
- applied_state = yield self.store.set_appservice_state(
+ yield self.store.set_appservice_state(
service,
ApplicationServiceState.DOWN
)
- if applied_state:
- logger.info(
- "Application service falling behind. Starting recoverer. %s",
- service
- )
- recoverer = self.recoverer_fn(service, self.on_recovered)
- self.add_recoverers([recoverer])
- recoverer.recover()
- else:
- logger.error("Failed to apply appservice state DOWN to service %s",
- service)
+ logger.info(
+ "Application service falling behind. Starting recoverer. AS ID %s",
+ service.id
+ )
+ recoverer = self.recoverer_fn(service, self.on_recovered)
+ self.add_recoverers([recoverer])
+ recoverer.recover()
@defer.inlineCallbacks
def _is_service_up(self, service):
@@ -190,6 +184,8 @@ class _Recoverer(object):
_Recoverer(clock, store, as_api, s, callback) for s in services
]
for r in recoverers:
+ logger.info("Starting recoverer for AS ID %s which was marked as "
+ "DOWN", r.service.id)
r.recover()
defer.returnValue(recoverers)
@@ -206,12 +202,13 @@ class _Recoverer(object):
@defer.inlineCallbacks
def retry(self):
- txn = yield self._get_oldest_txn()
+ txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
- logger.info("Retrying transaction %s for service %s",
- txn.id, txn.service)
- if txn.send(self.as_api):
- txn.complete(self.store)
+ logger.info("Retrying transaction %s for AS ID %s",
+ txn.id, txn.service.id)
+ sent = yield txn.send(self.as_api)
+ if sent:
+ yield txn.complete(self.store)
# reset the backoff counter and retry immediately
self.backoff_counter = 1
yield self.retry()
@@ -225,8 +222,3 @@ class _Recoverer(object):
def _set_service_recovered(self):
self.callback(self)
-
- @defer.inlineCallbacks
- def _get_oldest_txn(self):
- txn = yield self.store.get_oldest_unsent_txn(self.service)
- defer.returnValue(txn)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index fe347dfd3c..c4b4f56c5d 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore):
services = {}
for res in results:
as_token = res["token"]
+ if as_token is None:
+ continue
if as_token not in services:
# add the service
services[as_token] = {
@@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
# Monotonically increasing txn ids, so just select the smallest
# one in the txns table (we delete them when they are sent)
result = txn.execute(
- "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?",
+ "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?",
(service.id,)
)
entry = self.cursor_to_dict(result)[0]
-
if not entry or entry["txn_id"] is None:
# the min(txn_id) part will force a row, so entry may not be None
return None
|