diff options
Diffstat (limited to 'synapse/appservice')
-rw-r--r-- | synapse/appservice/api.py | 8 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 52 |
2 files changed, 25 insertions, 35 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3acb8867a2..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) - response = None try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3ee2406463..add1e3879c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ class AppServiceScheduler(object): @defer.inlineCallbacks def start(self): + logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered @@ -137,40 +138,33 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service: %s", - recoverer.service) - logger.info("Active recoverers: %s", len(self.recoverers)) - applied_state = yield self.store.set_appservice_state( + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - if not applied_state: - logger.error("Failed to apply appservice state UP to service %s", - recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) if len(recoverers) > 0: - logger.info("Active recoverers: %s", len(self.recoverers)) + logger.info("New active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): - applied_state = yield self.store.set_appservice_state( + yield self.store.set_appservice_state( service, ApplicationServiceState.DOWN ) - if applied_state: - logger.info( - "Application service falling behind. Starting recoverer. %s", - service - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() - else: - logger.error("Failed to apply appservice state DOWN to service %s", - service) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() @defer.inlineCallbacks def _is_service_up(self, service): @@ -190,6 +184,8 @@ class _Recoverer(object): _Recoverer(clock, store, as_api, s, callback) for s in services ] for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) r.recover() defer.returnValue(recoverers) @@ -206,12 +202,13 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): - txn = yield self._get_oldest_txn() + txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for service %s", - txn.id, txn.service) - if txn.send(self.as_api): - txn.complete(self.store) + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 yield self.retry() @@ -225,8 +222,3 @@ class _Recoverer(object): def _set_service_recovered(self): self.callback(self) - - @defer.inlineCallbacks - def _get_oldest_txn(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - defer.returnValue(txn) |