summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/scheduler.py4
-rw-r--r--tests/appservice/test_scheduler.py54
2 files changed, 49 insertions, 9 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 754f39381f..f54df9c9a5 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -174,7 +174,7 @@ class _Recoverer(object):
         self.backoff_counter = 1
 
     def recover(self):
-        self.clock.call_later(2000 ** self.backoff_counter, self.retry)
+        self.clock.call_later(1000 * (2 ** self.backoff_counter), self.retry)
 
     @defer.inlineCallbacks
     def retry(self):
@@ -184,7 +184,7 @@ class _Recoverer(object):
                 txn.complete(self.store)
                 # reset the backoff counter and retry immediately
                 self.backoff_counter = 1
-                self.retry()
+                yield self.retry()
             else:
                 self.backoff_counter += 1
                 self.recover()
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index b41d4358cf..1e3eb9e1cc 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -21,6 +21,7 @@ from ..utils import MockClock
 from mock import Mock
 from tests import unittest
 
+
 class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
 
     def setUp(self):
@@ -37,21 +38,60 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
             callback=self.callback,
         )
 
-    def test_recover_service_single_txn(self):
-        txns = self._mk_txns(1)
-        self.store.get_oldest_txn = Mock(return_value=defer.succeed(txns[0]))
+    def test_recover_single_txn(self):
+        txn = Mock()
+        # return one txn to send, then no more old txns
+        txns = [txn, None]
+
+        def take_txn(*args, **kwargs):
+            return defer.succeed(txns.pop(0))
+        self.store.get_oldest_txn = Mock(side_effect=take_txn)
 
         self.recoverer.recover()
+        # shouldn't have called anything prior to waiting for exp backoff
         self.assertEquals(0, self.store.get_oldest_txn.call_count)
+        txn.send = Mock(return_value=True)
+        # wait for exp backoff
         self.clock.advance_time(2000)
+        self.assertEquals(1, txn.send.call_count)
+        self.assertEquals(1, txn.complete.call_count)
+        # 2 because it needs to get None to know there are no more txns
         self.assertEquals(2, self.store.get_oldest_txn.call_count)
+        self.assertEquals(1, self.callback.call_count)
 
-    def _mk_txns(self, num_txns):
-        return [
-            Mock() for i in range(num_txns)
-        ]
+    def test_recover_retry_txn(self):
+        txn = Mock()
+        txns = [txn, None]
+        pop_txn = False
 
+        def take_txn(*args, **kwargs):
+            if pop_txn:
+                return defer.succeed(txns.pop(0))
+            else:
+                return defer.succeed(txn)
+        self.store.get_oldest_txn = Mock(side_effect=take_txn)
 
+        self.recoverer.recover()
+        self.assertEquals(0, self.store.get_oldest_txn.call_count)
+        txn.send = Mock(return_value=False)
+        self.clock.advance_time(2000)
+        self.assertEquals(1, txn.send.call_count)
+        self.assertEquals(0, txn.complete.call_count)
+        self.assertEquals(0, self.callback.call_count)
+        self.clock.advance_time(4000)
+        self.assertEquals(2, txn.send.call_count)
+        self.assertEquals(0, txn.complete.call_count)
+        self.assertEquals(0, self.callback.call_count)
+        self.clock.advance_time(8000)
+        self.assertEquals(3, txn.send.call_count)
+        self.assertEquals(0, txn.complete.call_count)
+        self.assertEquals(0, self.callback.call_count)
+        txn.send = Mock(return_value=True)  # successfully send the txn
+        pop_txn = True  # returns the txn the first time, then no more.
+        self.clock.advance_time(16000)
+        self.assertEquals(1, txn.send.call_count)  # new mock reset call count
+        self.assertEquals(1, txn.complete.call_count)
+        self.assertEquals(1, self.callback.call_count)
 
 class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):