summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-03-16 10:38:02 +0000
committerKegan Dougal <kegan@matrix.org>2015-03-16 10:38:02 +0000
commitc9c444f56260b414d474ea7e9ae28a1a66400357 (patch)
treeb3be305df32b1c3b8ee6245e3b02c3227d7bfd1f /synapse/appservice/scheduler.py
parentMinor PR comment tweaks. (diff)
downloadsynapse-c9c444f56260b414d474ea7e9ae28a1a66400357.tar.xz
Wrap polling/retry blocks in try/excepts to avoid sending to other ASes breaking permanently should an error occur.
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py68
1 files changed, 39 insertions, 29 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 8a3a6a880f..59a870e271 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -120,19 +120,22 @@ class _TransactionController(object):
 
     @defer.inlineCallbacks
     def start_polling(self):
-        groups = self.event_grouper.drain_groups()
-        for service in groups:
-            txn = yield self.store.create_appservice_txn(
-                service=service,
-                events=groups[service]
-            )
-            service_is_up = yield self._is_service_up(service)
-            if service_is_up:
-                sent = yield txn.send(self.as_api)
-                if sent:
-                    txn.complete(self.store)
-                else:
-                    self._start_recoverer(service)
+        try:
+            groups = self.event_grouper.drain_groups()
+            for service in groups:
+                txn = yield self.store.create_appservice_txn(
+                    service=service,
+                    events=groups[service]
+                )
+                service_is_up = yield self._is_service_up(service)
+                if service_is_up:
+                    sent = yield txn.send(self.as_api)
+                    if sent:
+                        txn.complete(self.store)
+                    else:
+                        self._start_recoverer(service)
+        except Exception as e:
+            logger.exception(e)
         self.clock.call_later(1, self.start_polling)
 
     @defer.inlineCallbacks
@@ -200,25 +203,32 @@ class _Recoverer(object):
     def recover(self):
         self.clock.call_later((2 ** self.backoff_counter), self.retry)
 
+    def _backoff(self):
+        # cap the backoff to be around 18h => (2^16) = 65536 secs
+        if self.backoff_counter < 16:
+            self.backoff_counter += 1
+        self.recover()
+
     @defer.inlineCallbacks
     def retry(self):
-        txn = yield self.store.get_oldest_unsent_txn(self.service)
-        if txn:
-            logger.info("Retrying transaction %s for AS ID %s",
-                        txn.id, txn.service.id)
-            sent = yield txn.send(self.as_api)
-            if sent:
-                yield txn.complete(self.store)
-                # reset the backoff counter and retry immediately
-                self.backoff_counter = 1
-                yield self.retry()
+        try:
+            txn = yield self.store.get_oldest_unsent_txn(self.service)
+            if txn:
+                logger.info("Retrying transaction %s for AS ID %s",
+                            txn.id, txn.service.id)
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    yield txn.complete(self.store)
+                    # reset the backoff counter and retry immediately
+                    self.backoff_counter = 1
+                    yield self.retry()
+                else:
+                    self._backoff()
             else:
-                # cap the backoff to be around 18h => (2^16) = 65536 secs
-                if self.backoff_counter < 16:
-                    self.backoff_counter += 1
-                self.recover()
-        else:
-            self._set_service_recovered()
+                self._set_service_recovered()
+        except Exception as e:
+            logger.exception(e)
+            self._backoff()
 
     def _set_service_recovered(self):
         self.callback(self)