summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2014-12-08 19:34:51 +0000
committerMatthew Hodgson <matthew@matrix.org>2014-12-08 19:34:51 +0000
commit8529fba02d93ed1d0d08873f0cbbd58a3194e4af (patch)
treea56b25904e85ac312619454df3251b95d158ea04 /synapse/federation/replication.py
parentadd a write-through cache on the retry schedule (diff)
downloadsynapse-8529fba02d93ed1d0d08873f0cbbd58a3194e4af.tar.xz
fix a million stupid bugs and make it actually work
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py25
1 files changed, 17 insertions, 8 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 88184caecd..c4c6667b62 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -685,6 +685,7 @@ class _TransactionQueue(object):
         self.transport_layer = transport_layer
 
         self._clock = hs.get_clock()
+        self.store = hs.get_datastore()
 
         # Is a mapping from destinations -> deferreds. Used to keep track
         # of which destinations have transactions in flight and when they are
@@ -775,11 +776,18 @@ class _TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
-        
-        (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination)
-        if retry_last_ts + retry_interval > int(self._clock.time_msec()):
-            logger.info("TX [%s] not ready for retry yet - dropping transaction for now")
-            return
+
+        (retry_last_ts, retry_interval) = (0, 0)
+        retry_timings = yield self.store.get_destination_retry_timings(destination)
+        if retry_timings:
+            (retry_last_ts, retry_interval) = (
+                retry_timings.retry_last_ts, retry_timings.retry_interval
+            )
+            if retry_last_ts + retry_interval > int(self._clock.time_msec()):
+                logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination)
+                return
+            else:
+                logger.info("TX [%s] is ready for retry", destination)
         
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending request
@@ -866,7 +874,7 @@ class _TransactionQueue(object):
                 if code == 200:
                     if retry_last_ts:
                         # this host is alive! reset retry schedule
-                        self.store.set_destination_retry_timings(destination, 0, 0)
+                        yield self.store.set_destination_retry_timings(destination, 0, 0)
                     deferred.callback(None)
                 else:
                     self.start_retrying(destination, retry_interval)
@@ -899,12 +907,13 @@ class _TransactionQueue(object):
             # Check to see if there is anything else to send.
             self._attempt_new_transaction(destination)
 
+    @defer.inlineCallbacks
     def start_retrying(self, destination, retry_interval):
         # track that this destination is having problems and we should
         # give it a chance to recover before trying it again
         if retry_interval:
             retry_interval *= 2
         else:
-            retry_interval = 2 # try again at first after 2 seconds
-        self.store.set_destination_retry_timings(destination,
+            retry_interval = 2000 # try again at first after 2 seconds
+        yield self.store.set_destination_retry_timings(destination,
             int(self._clock.time_msec()), retry_interval)