summary refs log tree commit diff
path: root/synapse/storage/transactions.py
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2014-12-07 02:26:07 +0000
committerMatthew Hodgson <matthew@matrix.org>2014-12-07 02:26:07 +0000
commitaed62a35832a3ec1c7425ecc99cab06a781263ba (patch)
tree6dd8bdeb4f79084a084f24f6747c1e1d00fcf973 /synapse/storage/transactions.py
parentPull in latest matrix-angular_sdk (diff)
downloadsynapse-aed62a35832a3ec1c7425ecc99cab06a781263ba.tar.xz
track replication destination health, and perform exponential back-off when sending transactions. does *not* yet retry transactions, but drops them on the floor if waiting for a server to recover.
Diffstat (limited to 'synapse/storage/transactions.py')
-rw-r--r--synapse/storage/transactions.py78
1 files changed, 77 insertions, 1 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..47b73f7458 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -114,7 +114,7 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
-        # First we find out what the prev_txs should be.
+        # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
         query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +205,71 @@ class TransactionStore(SQLBaseStore):
 
         return ReceivedTransactionsTable.decode_results(txn.fetchall())
 
+    def get_destination_retry_timings(self, destination):
+        """Gets the current retry timings (if any) for a given destination.
+        
+        Args:
+            destination (str)
+        
+        Returns:
+            None if not retrying
+            tuple: (retry_last_ts, retry_interval)
+                retry_ts: time of last retry attempt in unix epoch ms
+                retry_interval: how long until next retry in ms
+        """
+        return self.runInteraction(
+            "get_destination_retry_timings",
+            self._get_destination_retry_timings, destination)
+            
+    def _get_destination_retry_timings(cls, txn, destination):
+        query = DestinationsTable.select_statement("destination = ?")
+        txn.execute(query, (destination,))
+        result = DestinationsTable.decode_single_result(txn.fetchone())
+        if result and result[0] > 0:
+            return result
+        else:
+            return None
+        
+    def set_destination_retry_timings(self, destination):
+        """Sets the current retry timings for a given destination.
+        Both timings should be zero if retrying is no longer occuring.
+        
+        Args:
+            destination (str)
+            retry_last_ts (int) - time of last retry attempt in unix epoch ms
+            retry_interval (int) - how long until next retry in ms
+        """
+        return self.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
+            
+    def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval):
+
+        query = (
+            "INSERT OR REPLACE INTO %s "
+            "(retry_last_ts, retry_interval) "
+            "VALUES (?, ?) "
+            "WHERE destination = ?"
+        ) % DestinationsTable.table_name
+
+        txn.execute(query, (retry_last_ts, retry_interval, destination))
+
+    def get_destinations_needing_retry(self):
+        """Get all destinations which are due a retry for sending a transaction.
+                    
+        Returns:
+            list: A list of `DestinationsTable.EntryType`
+        """
+        return self.runInteraction(
+            "get_destinations_needing_retry",
+            self._get_destinations_needing_retry
+        )
+        
+    def _get_destinations_needing_retry(cls, txn):
+        where = "retry_last_ts > 0 and retry_next_ts < now()"
+        query = DestinationsTable.select_statement(where)
+        txn.execute(query)
+        return DestinationsTable.decode_results(txn.fetchall())    
 
 class ReceivedTransactionsTable(Table):
     table_name = "received_transactions"
@@ -247,3 +312,14 @@ class TransactionsToPduTable(Table):
     ]
 
     EntryType = namedtuple("TransactionsToPduEntry", fields)
+    
+class DestinationsTable(Table):
+    table_name = "destinations"
+    
+    fields = [
+        "destination",
+        "retry_last_ts",
+        "retry_interval",
+    ]
+
+    EntryType = namedtuple("DestinationsEntry", fields)
\ No newline at end of file