summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py49
1 files changed, 20 insertions, 29 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 9998f822f1..d5204b1314 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -50,8 +50,6 @@ components.
 """
 import logging
 
-from twisted.internet import defer
-
 from synapse.appservice import ApplicationServiceState
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -73,12 +71,11 @@ class ApplicationServiceScheduler(object):
         self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
         self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
-    @defer.inlineCallbacks
-    def start(self):
+    async def start(self):
         logger.info("Starting appservice scheduler")
 
         # check for any DOWN ASes and start recoverers for them.
-        services = yield self.store.get_appservices_by_state(
+        services = await self.store.get_appservices_by_state(
             ApplicationServiceState.DOWN
         )
 
@@ -117,8 +114,7 @@ class _ServiceQueuer(object):
             "as-sender-%s" % (service.id,), self._send_request, service
         )
 
-    @defer.inlineCallbacks
-    def _send_request(self, service):
+    async def _send_request(self, service):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
         assert service.id not in self.requests_in_flight
@@ -130,7 +126,7 @@ class _ServiceQueuer(object):
                 if not events:
                     return
                 try:
-                    yield self.txn_ctrl.send(service, events)
+                    await self.txn_ctrl.send(service, events)
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -162,36 +158,33 @@ class _TransactionController(object):
         # for UTs
         self.RECOVERER_CLASS = _Recoverer
 
-    @defer.inlineCallbacks
-    def send(self, service, events):
+    async def send(self, service, events):
         try:
-            txn = yield self.store.create_appservice_txn(service=service, events=events)
-            service_is_up = yield self._is_service_up(service)
+            txn = await self.store.create_appservice_txn(service=service, events=events)
+            service_is_up = await self._is_service_up(service)
             if service_is_up:
-                sent = yield txn.send(self.as_api)
+                sent = await txn.send(self.as_api)
                 if sent:
-                    yield txn.complete(self.store)
+                    await txn.complete(self.store)
                 else:
                     run_in_background(self._on_txn_fail, service)
         except Exception:
             logger.exception("Error creating appservice transaction")
             run_in_background(self._on_txn_fail, service)
 
-    @defer.inlineCallbacks
-    def on_recovered(self, recoverer):
+    async def on_recovered(self, recoverer):
         logger.info(
             "Successfully recovered application service AS ID %s", recoverer.service.id
         )
         self.recoverers.pop(recoverer.service.id)
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
-        yield self.store.set_appservice_state(
+        await self.store.set_appservice_state(
             recoverer.service, ApplicationServiceState.UP
         )
 
-    @defer.inlineCallbacks
-    def _on_txn_fail(self, service):
+    async def _on_txn_fail(self, service):
         try:
-            yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
+            await self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
             self.start_recoverer(service)
         except Exception:
             logger.exception("Error starting AS recoverer")
@@ -211,9 +204,8 @@ class _TransactionController(object):
         recoverer.recover()
         logger.info("Now %i active recoverers", len(self.recoverers))
 
-    @defer.inlineCallbacks
-    def _is_service_up(self, service):
-        state = yield self.store.get_appservice_state(service)
+    async def _is_service_up(self, service):
+        state = await self.store.get_appservice_state(service)
         return state == ApplicationServiceState.UP or state is None
 
 
@@ -254,25 +246,24 @@ class _Recoverer(object):
             self.backoff_counter += 1
         self.recover()
 
-    @defer.inlineCallbacks
-    def retry(self):
+    async def retry(self):
         logger.info("Starting retries on %s", self.service.id)
         try:
             while True:
-                txn = yield self.store.get_oldest_unsent_txn(self.service)
+                txn = await self.store.get_oldest_unsent_txn(self.service)
                 if not txn:
                     # nothing left: we're done!
-                    self.callback(self)
+                    await self.callback(self)
                     return
 
                 logger.info(
                     "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
                 )
-                sent = yield txn.send(self.as_api)
+                sent = await txn.send(self.as_api)
                 if not sent:
                     break
 
-                yield txn.complete(self.store)
+                await txn.complete(self.store)
 
                 # reset the backoff counter and then process the next transaction
                 self.backoff_counter = 1