diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 9998f822f1..d5204b1314 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -50,8 +50,6 @@ components.
"""
import logging
-from twisted.internet import defer
-
from synapse.appservice import ApplicationServiceState
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -73,12 +71,11 @@ class ApplicationServiceScheduler(object):
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
- @defer.inlineCallbacks
- def start(self):
+ async def start(self):
logger.info("Starting appservice scheduler")
# check for any DOWN ASes and start recoverers for them.
- services = yield self.store.get_appservices_by_state(
+ services = await self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)
@@ -117,8 +114,7 @@ class _ServiceQueuer(object):
"as-sender-%s" % (service.id,), self._send_request, service
)
- @defer.inlineCallbacks
- def _send_request(self, service):
+ async def _send_request(self, service):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
assert service.id not in self.requests_in_flight
@@ -130,7 +126,7 @@ class _ServiceQueuer(object):
if not events:
return
try:
- yield self.txn_ctrl.send(service, events)
+ await self.txn_ctrl.send(service, events)
except Exception:
logger.exception("AS request failed")
finally:
@@ -162,36 +158,33 @@ class _TransactionController(object):
# for UTs
self.RECOVERER_CLASS = _Recoverer
- @defer.inlineCallbacks
- def send(self, service, events):
+ async def send(self, service, events):
try:
- txn = yield self.store.create_appservice_txn(service=service, events=events)
- service_is_up = yield self._is_service_up(service)
+ txn = await self.store.create_appservice_txn(service=service, events=events)
+ service_is_up = await self._is_service_up(service)
if service_is_up:
- sent = yield txn.send(self.as_api)
+ sent = await txn.send(self.as_api)
if sent:
- yield txn.complete(self.store)
+ await txn.complete(self.store)
else:
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._on_txn_fail, service)
- @defer.inlineCallbacks
- def on_recovered(self, recoverer):
+ async def on_recovered(self, recoverer):
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
- yield self.store.set_appservice_state(
+ await self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)
- @defer.inlineCallbacks
- def _on_txn_fail(self, service):
+ async def _on_txn_fail(self, service):
try:
- yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
+ await self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
@@ -211,9 +204,8 @@ class _TransactionController(object):
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
- @defer.inlineCallbacks
- def _is_service_up(self, service):
- state = yield self.store.get_appservice_state(service)
+ async def _is_service_up(self, service):
+ state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None
@@ -254,25 +246,24 @@ class _Recoverer(object):
self.backoff_counter += 1
self.recover()
- @defer.inlineCallbacks
- def retry(self):
+ async def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
while True:
- txn = yield self.store.get_oldest_unsent_txn(self.service)
+ txn = await self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
- self.callback(self)
+ await self.callback(self)
return
logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
- sent = yield txn.send(self.as_api)
+ sent = await txn.send(self.as_api)
if not sent:
break
- yield txn.complete(self.store)
+ await txn.complete(self.store)
# reset the backoff counter and then process the next transaction
self.backoff_counter = 1
|