From be09c23ff02bee9c63611df528a269fb157f2f3c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 5 Mar 2015 15:40:07 +0000 Subject: Add txn_id kwarg to push methods --- synapse/appservice/api.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'synapse/appservice/api.py') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c2179f8d55..c17fb219c5 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -72,11 +72,16 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push_bulk(self, service, events): + def push_bulk(self, service, events, txn_id=None): events = self._serialize(events) + if txn_id is None: + logger.warning("push_bulk: Missing txn ID sending events to %s", + service.url) + txn_id = str(0) + uri = service.url + ("/transactions/%s" % - urllib.quote(str(0))) # TODO txn_ids + urllib.quote(txn_id)) response = None try: response = yield self.put_json( @@ -97,8 +102,8 @@ class ApplicationServiceApi(SimpleHttpClient): defer.returnValue(False) @defer.inlineCallbacks - def push(self, service, event): - response = yield self.push_bulk(service, [event]) + def push(self, service, event, txn_id=None): + response = yield self.push_bulk(service, [event], txn_id) defer.returnValue(response) def _serialize(self, events): -- cgit 1.4.1 From 04c9751f24885b974d564b3e5749b7fc9ce01c73 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 9 Mar 2015 17:45:41 +0000 Subject: Bug fixes whilst putting it all together --- synapse/appservice/api.py | 1 + synapse/appservice/scheduler.py | 4 +++- synapse/storage/appservice.py | 9 ++++----- 3 files changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse/appservice/api.py') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c17fb219c5..3acb8867a2 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -79,6 +79,7 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("push_bulk: Missing txn ID sending events to %s", service.url) txn_id = str(0) + txn_id = str(txn_id) uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 068d4bd087..3ee2406463 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -175,7 +175,7 @@ class _TransactionController(object): @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) - defer.returnValue(state == ApplicationServiceState.UP) + defer.returnValue(state == ApplicationServiceState.UP or state is None) class _Recoverer(object): @@ -208,6 +208,8 @@ class _Recoverer(object): def retry(self): txn = yield self._get_oldest_txn() if txn: + logger.info("Retrying transaction %s for service %s", + txn.id, txn.service) if txn.send(self.as_api): txn.complete(self.store) # reset the backoff counter and retry immediately diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c3c0a0bd43..ab03106513 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -337,9 +337,8 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" - sql = ("SELECT * FROM application_services LEFT JOIN " - "application_services_regex ON application_services.id = " - "application_services_regex.as_id") + sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " + "application_services_regex AS r ON a.id = r.as_id") results = yield self._execute_and_decode(sql) services = self._parse_services_dict(results) @@ -528,7 +527,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): return None event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(event_ids) + events = self._get_events_txn(txn, event_ids) return AppServiceTransaction( service=service, id=entry["txn_id"], events=events @@ -540,7 +539,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): (service_id,) ) last_txn_id = result.fetchone() - if last_txn_id is None: # no row exists + if last_txn_id is None or last_txn_id[0] is None: # no row exists return 0 else: return int(last_txn_id[0]) # select 'last_txn' col -- cgit 1.4.1 From db1fbc6c6fb23ab92dd712aa60f0ff46ea76b42c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Tue, 10 Mar 2015 10:04:20 +0000 Subject: Fix remaining scheduler bugs. Add more informative logging. --- synapse/appservice/api.py | 8 +++---- synapse/appservice/scheduler.py | 52 +++++++++++++++++------------------------ synapse/storage/appservice.py | 5 ++-- 3 files changed, 28 insertions(+), 37 deletions(-) (limited to 'synapse/appservice/api.py') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 3acb8867a2..2a9becccb3 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -83,9 +83,8 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.quote(txn_id)) - response = None try: - response = yield self.put_json( + yield self.put_json( uri=uri, json_body={ "events": events @@ -93,9 +92,8 @@ class ApplicationServiceApi(SimpleHttpClient): args={ "access_token": service.hs_token }) - if response: # just an empty json object - # TODO: Mark txn as sent successfully - defer.returnValue(True) + defer.returnValue(True) + return except CodeMessageException as e: logger.warning("push_bulk to %s received %s", uri, e.code) except Exception as ex: diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 3ee2406463..add1e3879c 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -77,6 +77,7 @@ class AppServiceScheduler(object): @defer.inlineCallbacks def start(self): + logger.info("Starting appservice scheduler") # check for any DOWN ASes and start recoverers for them. recoverers = yield _Recoverer.start( self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered @@ -137,40 +138,33 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service: %s", - recoverer.service) - logger.info("Active recoverers: %s", len(self.recoverers)) - applied_state = yield self.store.set_appservice_state( + logger.info("Successfully recovered application service AS ID %s", + recoverer.service.id) + logger.info("Remaining active recoverers: %s", len(self.recoverers)) + yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - if not applied_state: - logger.error("Failed to apply appservice state UP to service %s", - recoverer.service) def add_recoverers(self, recoverers): for r in recoverers: self.recoverers.append(r) if len(recoverers) > 0: - logger.info("Active recoverers: %s", len(self.recoverers)) + logger.info("New active recoverers: %s", len(self.recoverers)) @defer.inlineCallbacks def _start_recoverer(self, service): - applied_state = yield self.store.set_appservice_state( + yield self.store.set_appservice_state( service, ApplicationServiceState.DOWN ) - if applied_state: - logger.info( - "Application service falling behind. Starting recoverer. %s", - service - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() - else: - logger.error("Failed to apply appservice state DOWN to service %s", - service) + logger.info( + "Application service falling behind. Starting recoverer. AS ID %s", + service.id + ) + recoverer = self.recoverer_fn(service, self.on_recovered) + self.add_recoverers([recoverer]) + recoverer.recover() @defer.inlineCallbacks def _is_service_up(self, service): @@ -190,6 +184,8 @@ class _Recoverer(object): _Recoverer(clock, store, as_api, s, callback) for s in services ] for r in recoverers: + logger.info("Starting recoverer for AS ID %s which was marked as " + "DOWN", r.service.id) r.recover() defer.returnValue(recoverers) @@ -206,12 +202,13 @@ class _Recoverer(object): @defer.inlineCallbacks def retry(self): - txn = yield self._get_oldest_txn() + txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for service %s", - txn.id, txn.service) - if txn.send(self.as_api): - txn.complete(self.store) + logger.info("Retrying transaction %s for AS ID %s", + txn.id, txn.service.id) + sent = yield txn.send(self.as_api) + if sent: + yield txn.complete(self.store) # reset the backoff counter and retry immediately self.backoff_counter = 1 yield self.retry() @@ -225,8 +222,3 @@ class _Recoverer(object): def _set_service_recovered(self): self.callback(self) - - @defer.inlineCallbacks - def _get_oldest_txn(self): - txn = yield self.store.get_oldest_unsent_txn(self.service) - defer.returnValue(txn) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index fe347dfd3c..c4b4f56c5d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -293,6 +293,8 @@ class ApplicationServiceStore(SQLBaseStore): services = {} for res in results: as_token = res["token"] + if as_token is None: + continue if as_token not in services: # add the service services[as_token] = { @@ -516,11 +518,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) result = txn.execute( - "SELECT *,MIN(txn_id) FROM application_services_txns WHERE as_id=?", + "SELECT MIN(txn_id), * FROM application_services_txns WHERE as_id=?", (service.id,) ) entry = self.cursor_to_dict(result)[0] - if not entry or entry["txn_id"] is None: # the min(txn_id) part will force a row, so entry may not be None return None -- cgit 1.4.1