summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
committerErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
commit45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3 (patch)
tree07bb21377c6611db89f64f948a2e27645662ff0e /synapse/appservice/scheduler.py
parentAdd descriptions and remove redundant set(..) (diff)
parentRun Black. (#5482) (diff)
downloadsynapse-45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/histogram_extremities
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py50
1 files changed, 20 insertions, 30 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 685f15c061..b54bf5411f 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -112,15 +112,14 @@ class _ServiceQueuer(object):
             return
 
         run_as_background_process(
-            "as-sender-%s" % (service.id, ),
-            self._send_request, service,
+            "as-sender-%s" % (service.id,), self._send_request, service
         )
 
     @defer.inlineCallbacks
     def _send_request(self, service):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
-        assert(service.id not in self.requests_in_flight)
+        assert service.id not in self.requests_in_flight
 
         self.requests_in_flight.add(service.id)
         try:
@@ -137,7 +136,6 @@ class _ServiceQueuer(object):
 
 
 class _TransactionController(object):
-
     def __init__(self, clock, store, as_api, recoverer_fn):
         self.clock = clock
         self.store = store
@@ -149,10 +147,7 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def send(self, service, events):
         try:
-            txn = yield self.store.create_appservice_txn(
-                service=service,
-                events=events
-            )
+            txn = yield self.store.create_appservice_txn(service=service, events=events)
             service_is_up = yield self._is_service_up(service)
             if service_is_up:
                 sent = yield txn.send(self.as_api)
@@ -167,12 +162,12 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
         self.recoverers.remove(recoverer)
-        logger.info("Successfully recovered application service AS ID %s",
-                    recoverer.service.id)
+        logger.info(
+            "Successfully recovered application service AS ID %s", recoverer.service.id
+        )
         logger.info("Remaining active recoverers: %s", len(self.recoverers))
         yield self.store.set_appservice_state(
-            recoverer.service,
-            ApplicationServiceState.UP
+            recoverer.service, ApplicationServiceState.UP
         )
 
     def add_recoverers(self, recoverers):
@@ -184,13 +179,10 @@ class _TransactionController(object):
     @defer.inlineCallbacks
     def _start_recoverer(self, service):
         try:
-            yield self.store.set_appservice_state(
-                service,
-                ApplicationServiceState.DOWN
-            )
+            yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
             logger.info(
                 "Application service falling behind. Starting recoverer. AS ID %s",
-                service.id
+                service.id,
             )
             recoverer = self.recoverer_fn(service, self.on_recovered)
             self.add_recoverers([recoverer])
@@ -205,19 +197,16 @@ class _TransactionController(object):
 
 
 class _Recoverer(object):
-
     @staticmethod
     @defer.inlineCallbacks
     def start(clock, store, as_api, callback):
-        services = yield store.get_appservices_by_state(
-            ApplicationServiceState.DOWN
-        )
-        recoverers = [
-            _Recoverer(clock, store, as_api, s, callback) for s in services
-        ]
+        services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
+        recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
         for r in recoverers:
-            logger.info("Starting recoverer for AS ID %s which was marked as "
-                        "DOWN", r.service.id)
+            logger.info(
+                "Starting recoverer for AS ID %s which was marked as " "DOWN",
+                r.service.id,
+            )
             r.recover()
         defer.returnValue(recoverers)
 
@@ -232,9 +221,9 @@ class _Recoverer(object):
     def recover(self):
         def _retry():
             run_as_background_process(
-                "as-recoverer-%s" % (self.service.id,),
-                self.retry,
+                "as-recoverer-%s" % (self.service.id,), self.retry
             )
+
         self.clock.call_later((2 ** self.backoff_counter), _retry)
 
     def _backoff(self):
@@ -248,8 +237,9 @@ class _Recoverer(object):
         try:
             txn = yield self.store.get_oldest_unsent_txn(self.service)
             if txn:
-                logger.info("Retrying transaction %s for AS ID %s",
-                            txn.id, txn.service.id)
+                logger.info(
+                    "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
+                )
                 sent = yield txn.send(self.as_api)
                 if sent:
                     yield txn.complete(self.store)