From db1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 10 Mar 2015 10:04:20 +0000 Subject: Fix remaining scheduler bugs. Add more informative logging. --- synapse/appservice/api.py | 8 +++---- synapse/appservice/scheduler.py | 52 +++++++++++++++++------------------------ synapse/storage/appservice.py | 5 ++-- 3 files changed, 28 insertions(+), 37 deletions(-) (limited to 'synapse') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3acb8867a2..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) - response = None try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3ee2406463..add1e3879c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ class AppServiceScheduler(object): @defer.inlineCallbacks def start(self): + logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered @@ -137,40 +138,33 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service: %s", - recoverer.service) - logger.info("Active recoverers: %s", len(self.recoverers)) - applied_state = yield self.store.set_appservice_state( + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - if not applied_state: - logger.error("Failed to apply appservice state UP to service %s", - recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) if len(recoverers) > 0: - logger.info("Active recoverers: %s", len(self.recoverers)) + logger.info("New active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): - applied_state = yield self.store.set_appservice_state( + yield self.store.set_appservice_state( service, ApplicationServiceState.DOWN ) - if applied_state: - logger.info( - "Application service falling behind. Starting recoverer. %s", - service - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() - else: - logger.error("Failed to apply appservice state DOWN to service %s", - service) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() @defer.inlineCallbacks def _is_service_up(self, service): @@ -190,6 +184,8 @@ class _Recoverer(object): _Recoverer(clock, store, as_api, s, callback) for s in services ] for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) r.recover() defer.returnValue(recoverers) @@ -206,12 +202,13 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): - txn = yield self._get_oldest_txn() + txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for service %s", - txn.id, txn.service) - if txn.send(self.as_api): - txn.complete(self.store) + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 yield self.retry() @@ -225,8 +222,3 @@ class _Recoverer(object): def _set_service_recovered(self): self.callback(self) - - @defer.inlineCallbacks - def _get_oldest_txn(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe347dfd3c..c4b4f56c5d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore): services = {} for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { @@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) result = txn.execute( - "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", (service.id,) ) entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: # the min(txn_id) part will force a row, so entry may not be None return None -- cgit 1.4.1