summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/replication.py25
-rw-r--r--synapse/handlers/federation.py3
-rw-r--r--synapse/storage/transactions.py25
3 files changed, 33 insertions, 20 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 88184caecd..c4c6667b62 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -685,6 +685,7 @@ class _TransactionQueue(object):
         self.transport_layer = transport_layer
 
         self._clock = hs.get_clock()
+        self.store = hs.get_datastore()
 
         # Is a mapping from destinations -> deferreds. Used to keep track
         # of which destinations have transactions in flight and when they are
@@ -775,11 +776,18 @@ class _TransactionQueue(object):
     @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
-        
-        (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination)
-        if retry_last_ts + retry_interval > int(self._clock.time_msec()):
-            logger.info("TX [%s] not ready for retry yet - dropping transaction for now")
-            return
+
+        (retry_last_ts, retry_interval) = (0, 0)
+        retry_timings = yield self.store.get_destination_retry_timings(destination)
+        if retry_timings:
+            (retry_last_ts, retry_interval) = (
+                retry_timings.retry_last_ts, retry_timings.retry_interval
+            )
+            if retry_last_ts + retry_interval > int(self._clock.time_msec()):
+                logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination)
+                return
+            else:
+                logger.info("TX [%s] is ready for retry", destination)
         
         if destination in self.pending_transactions:
             # XXX: pending_transactions can get stuck on by a never-ending request
@@ -866,7 +874,7 @@ class _TransactionQueue(object):
                 if code == 200:
                     if retry_last_ts:
                         # this host is alive! reset retry schedule
-                        self.store.set_destination_retry_timings(destination, 0, 0)
+                        yield self.store.set_destination_retry_timings(destination, 0, 0)
                     deferred.callback(None)
                 else:
                     self.start_retrying(destination, retry_interval)
@@ -899,12 +907,13 @@ class _TransactionQueue(object):
             # Check to see if there is anything else to send.
             self._attempt_new_transaction(destination)
 
+    @defer.inlineCallbacks
     def start_retrying(self, destination, retry_interval):
         # track that this destination is having problems and we should
         # give it a chance to recover before trying it again
         if retry_interval:
             retry_interval *= 2
         else:
-            retry_interval = 2 # try again at first after 2 seconds
-        self.store.set_destination_retry_timings(destination,
+            retry_interval = 2000 # try again at first after 2 seconds
+        yield self.store.set_destination_retry_timings(destination,
             int(self._clock.time_msec()), retry_interval)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7a79e2d117..cfb5029774 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -211,7 +211,8 @@ class FederationHandler(BaseHandler):
         # if we're receiving valid events from an origin,
         # it's probably a good idea to mark it as not in retry-state
         # for sending (although this is a bit of a leap)
-        if ((self.store.get_destination_retry_timings(origin))[0]):
+        retry_timings = yield self.store.get_destination_retry_timings(origin)
+        if (retry_timings and retry_timings.retry_last_ts):
             self.store.set_destination_retry_timings(origin, 0, 0)
 
         room = yield self.store.get_room(event.room_id)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index fa51766e05..237b024451 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
 
 from collections import namedtuple
 
+from twisted.internet import defer
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -218,8 +220,8 @@ class TransactionStore(SQLBaseStore):
             None if not retrying
             Otherwise a DestinationsTable.EntryType for the retry scheme
         """
-        if self.destination_retry_cache[destination]:
-            return self.destination_retry_cache[destination]
+        if destination in self.destination_retry_cache:
+            return defer.succeed(self.destination_retry_cache[destination])
         
         return self.runInteraction(
             "get_destination_retry_timings",
@@ -228,11 +230,13 @@ class TransactionStore(SQLBaseStore):
     def _get_destination_retry_timings(cls, txn, destination):
         query = DestinationsTable.select_statement("destination = ?")
         txn.execute(query, (destination,))
-        result = DestinationsTable.decode_single_result(txn.fetchone())
-        if result and result.retry_last_ts > 0:
-            return result
-        else:
-            return None
+        result = txn.fetchall()
+        if result:
+            result = DestinationsTable.decode_single_result(result)
+            if result.retry_last_ts > 0:
+                return result
+            else:
+                return None
         
     def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
         """Sets the current retry timings for a given destination.
@@ -257,12 +261,11 @@ class TransactionStore(SQLBaseStore):
 
         query = (
             "INSERT OR REPLACE INTO %s "
-            "(retry_last_ts, retry_interval) "
-            "VALUES (?, ?) "
-            "WHERE destination = ?"
+            "(destination, retry_last_ts, retry_interval) "
+            "VALUES (?, ?, ?) "
         ) % DestinationsTable.table_name
 
-        txn.execute(query, (retry_last_ts, retry_interval, destination))
+        txn.execute(query, (destination, retry_last_ts, retry_interval))
 
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.