diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 685f15c061..b54bf5411f 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -112,15 +112,14 @@ class _ServiceQueuer(object):
return
run_as_background_process(
- "as-sender-%s" % (service.id, ),
- self._send_request, service,
+ "as-sender-%s" % (service.id,), self._send_request, service
)
@defer.inlineCallbacks
def _send_request(self, service):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
- assert(service.id not in self.requests_in_flight)
+ assert service.id not in self.requests_in_flight
self.requests_in_flight.add(service.id)
try:
@@ -137,7 +136,6 @@ class _ServiceQueuer(object):
class _TransactionController(object):
-
def __init__(self, clock, store, as_api, recoverer_fn):
self.clock = clock
self.store = store
@@ -149,10 +147,7 @@ class _TransactionController(object):
@defer.inlineCallbacks
def send(self, service, events):
try:
- txn = yield self.store.create_appservice_txn(
- service=service,
- events=events
- )
+ txn = yield self.store.create_appservice_txn(service=service, events=events)
service_is_up = yield self._is_service_up(service)
if service_is_up:
sent = yield txn.send(self.as_api)
@@ -167,12 +162,12 @@ class _TransactionController(object):
@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
- logger.info("Successfully recovered application service AS ID %s",
- recoverer.service.id)
+ logger.info(
+ "Successfully recovered application service AS ID %s", recoverer.service.id
+ )
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
- recoverer.service,
- ApplicationServiceState.UP
+ recoverer.service, ApplicationServiceState.UP
)
def add_recoverers(self, recoverers):
@@ -184,13 +179,10 @@ class _TransactionController(object):
@defer.inlineCallbacks
def _start_recoverer(self, service):
try:
- yield self.store.set_appservice_state(
- service,
- ApplicationServiceState.DOWN
- )
+ yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
- service.id
+ service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
@@ -205,19 +197,16 @@ class _TransactionController(object):
class _Recoverer(object):
-
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
- services = yield store.get_appservices_by_state(
- ApplicationServiceState.DOWN
- )
- recoverers = [
- _Recoverer(clock, store, as_api, s, callback) for s in services
- ]
+ services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
+ recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
- logger.info("Starting recoverer for AS ID %s which was marked as "
- "DOWN", r.service.id)
+ logger.info(
+ "Starting recoverer for AS ID %s which was marked as " "DOWN",
+ r.service.id,
+ )
r.recover()
defer.returnValue(recoverers)
@@ -232,9 +221,9 @@ class _Recoverer(object):
def recover(self):
def _retry():
run_as_background_process(
- "as-recoverer-%s" % (self.service.id,),
- self.retry,
+ "as-recoverer-%s" % (self.service.id,), self.retry
)
+
self.clock.call_later((2 ** self.backoff_counter), _retry)
def _backoff(self):
@@ -248,8 +237,9 @@ class _Recoverer(object):
try:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
- logger.info("Retrying transaction %s for AS ID %s",
- txn.id, txn.service.id)
+ logger.info(
+ "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
+ )
sent = yield txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
|