diff options
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 50 |
1 files changed, 20 insertions, 30 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 685f15c061..b54bf5411f 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -112,15 +112,14 @@ class _ServiceQueuer(object): return run_as_background_process( - "as-sender-%s" % (service.id, ), - self._send_request, service, + "as-sender-%s" % (service.id,), self._send_request, service ) @defer.inlineCallbacks def _send_request(self, service): # sanity-check: we shouldn't get here if this service already has a sender # running. - assert(service.id not in self.requests_in_flight) + assert service.id not in self.requests_in_flight self.requests_in_flight.add(service.id) try: @@ -137,7 +136,6 @@ class _ServiceQueuer(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, recoverer_fn): self.clock = clock self.store = store @@ -149,10 +147,7 @@ class _TransactionController(object): @defer.inlineCallbacks def send(self, service, events): try: - txn = yield self.store.create_appservice_txn( - service=service, - events=events - ) + txn = yield self.store.create_appservice_txn(service=service, events=events) service_is_up = yield self._is_service_up(service) if service_is_up: sent = yield txn.send(self.as_api) @@ -167,12 +162,12 @@ class _TransactionController(object): @defer.inlineCallbacks def on_recovered(self, recoverer): self.recoverers.remove(recoverer) - logger.info("Successfully recovered application service AS ID %s", - recoverer.service.id) + logger.info( + "Successfully recovered application service AS ID %s", recoverer.service.id + ) logger.info("Remaining active recoverers: %s", len(self.recoverers)) yield self.store.set_appservice_state( - recoverer.service, - ApplicationServiceState.UP + recoverer.service, ApplicationServiceState.UP ) def add_recoverers(self, recoverers): @@ -184,13 +179,10 @@ class _TransactionController(object): @defer.inlineCallbacks def _start_recoverer(self, service): try: - yield self.store.set_appservice_state( - service, - ApplicationServiceState.DOWN - ) + yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) logger.info( "Application service falling behind. Starting recoverer. AS ID %s", - service.id + service.id, ) recoverer = self.recoverer_fn(service, self.on_recovered) self.add_recoverers([recoverer]) @@ -205,19 +197,16 @@ class _TransactionController(object): class _Recoverer(object): - @staticmethod @defer.inlineCallbacks def start(clock, store, as_api, callback): - services = yield store.get_appservices_by_state( - ApplicationServiceState.DOWN - ) - recoverers = [ - _Recoverer(clock, store, as_api, s, callback) for s in services - ] + services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) + recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] for r in recoverers: - logger.info("Starting recoverer for AS ID %s which was marked as " - "DOWN", r.service.id) + logger.info( + "Starting recoverer for AS ID %s which was marked as " "DOWN", + r.service.id, + ) r.recover() defer.returnValue(recoverers) @@ -232,9 +221,9 @@ class _Recoverer(object): def recover(self): def _retry(): run_as_background_process( - "as-recoverer-%s" % (self.service.id,), - self.retry, + "as-recoverer-%s" % (self.service.id,), self.retry ) + self.clock.call_later((2 ** self.backoff_counter), _retry) def _backoff(self): @@ -248,8 +237,9 @@ class _Recoverer(object): try: txn = yield self.store.get_oldest_unsent_txn(self.service) if txn: - logger.info("Retrying transaction %s for AS ID %s", - txn.id, txn.service.id) + logger.info( + "Retrying transaction %s for AS ID %s", txn.id, txn.service.id + ) sent = yield txn.send(self.as_api) if sent: yield txn.complete(self.store) |