Wrap polling/retry blocks in try/excepts to avoid sending to other ASes breaking permanently should an error occur.
1 files changed, 39 insertions, 29 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 8a3a6a880f..59a870e271 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -120,19 +120,22 @@ class _TransactionController(object):
@defer.inlineCallbacks
def start_polling(self):
- groups = self.event_grouper.drain_groups()
- for service in groups:
- txn = yield self.store.create_appservice_txn(
- service=service,
- events=groups[service]
- )
- service_is_up = yield self._is_service_up(service)
- if service_is_up:
- sent = yield txn.send(self.as_api)
- if sent:
- txn.complete(self.store)
- else:
- self._start_recoverer(service)
+ try:
+ groups = self.event_grouper.drain_groups()
+ for service in groups:
+ txn = yield self.store.create_appservice_txn(
+ service=service,
+ events=groups[service]
+ )
+ service_is_up = yield self._is_service_up(service)
+ if service_is_up:
+ sent = yield txn.send(self.as_api)
+ if sent:
+ txn.complete(self.store)
+ else:
+ self._start_recoverer(service)
+ except Exception as e:
+ logger.exception(e)
self.clock.call_later(1, self.start_polling)
@defer.inlineCallbacks
@@ -200,25 +203,32 @@ class _Recoverer(object):
def recover(self):
self.clock.call_later((2 ** self.backoff_counter), self.retry)
+ def _backoff(self):
+ # cap the backoff to be around 18h => (2^16) = 65536 secs
+ if self.backoff_counter < 16:
+ self.backoff_counter += 1
+ self.recover()
+
@defer.inlineCallbacks
def retry(self):
- txn = yield self.store.get_oldest_unsent_txn(self.service)
- if txn:
- logger.info("Retrying transaction %s for AS ID %s",
- txn.id, txn.service.id)
- sent = yield txn.send(self.as_api)
- if sent:
- yield txn.complete(self.store)
- # reset the backoff counter and retry immediately
- self.backoff_counter = 1
- yield self.retry()
+ try:
+ txn = yield self.store.get_oldest_unsent_txn(self.service)
+ if txn:
+ logger.info("Retrying transaction %s for AS ID %s",
+ txn.id, txn.service.id)
+ sent = yield txn.send(self.as_api)
+ if sent:
+ yield txn.complete(self.store)
+ # reset the backoff counter and retry immediately
+ self.backoff_counter = 1
+ yield self.retry()
+ else:
+ self._backoff()
else:
- # cap the backoff to be around 18h => (2^16) = 65536 secs
- if self.backoff_counter < 16:
- self.backoff_counter += 1
- self.recover()
- else:
- self._set_service_recovered()
+ self._set_service_recovered()
+ except Exception as e:
+ logger.exception(e)
+ self._backoff()
def _set_service_recovered(self):
self.callback(self)
|