diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..47b73f7458 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -114,7 +114,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
- # First we find out what the prev_txs should be.
+ # First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +205,71 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ def get_destination_retry_timings(self, destination):
+ """Gets the current retry timings (if any) for a given destination.
+
+ Args:
+ destination (str)
+
+ Returns:
+ None if not retrying
+ tuple: (retry_last_ts, retry_interval)
+ retry_ts: time of last retry attempt in unix epoch ms
+ retry_interval: how long until next retry in ms
+ """
+ return self.runInteraction(
+ "get_destination_retry_timings",
+ self._get_destination_retry_timings, destination)
+
+ def _get_destination_retry_timings(cls, txn, destination):
+ query = DestinationsTable.select_statement("destination = ?")
+ txn.execute(query, (destination,))
+ result = DestinationsTable.decode_single_result(txn.fetchone())
+ if result and result[0] > 0:
+ return result
+ else:
+ return None
+
+ def set_destination_retry_timings(self, destination):
+ """Sets the current retry timings for a given destination.
+ Both timings should be zero if retrying is no longer occuring.
+
+ Args:
+ destination (str)
+ retry_last_ts (int) - time of last retry attempt in unix epoch ms
+ retry_interval (int) - how long until next retry in ms
+ """
+ return self.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
+
+ def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval):
+
+ query = (
+ "INSERT OR REPLACE INTO %s "
+ "(retry_last_ts, retry_interval) "
+ "VALUES (?, ?) "
+ "WHERE destination = ?"
+ ) % DestinationsTable.table_name
+
+ txn.execute(query, (retry_last_ts, retry_interval, destination))
+
+ def get_destinations_needing_retry(self):
+ """Get all destinations which are due a retry for sending a transaction.
+
+ Returns:
+ list: A list of `DestinationsTable.EntryType`
+ """
+ return self.runInteraction(
+ "get_destinations_needing_retry",
+ self._get_destinations_needing_retry
+ )
+
+ def _get_destinations_needing_retry(cls, txn):
+ where = "retry_last_ts > 0 and retry_next_ts < now()"
+ query = DestinationsTable.select_statement(where)
+ txn.execute(query)
+ return DestinationsTable.decode_results(txn.fetchall())
class ReceivedTransactionsTable(Table):
table_name = "received_transactions"
@@ -247,3 +312,14 @@ class TransactionsToPduTable(Table):
]
EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+class DestinationsTable(Table):
+ table_name = "destinations"
+
+ fields = [
+ "destination",
+ "retry_last_ts",
+ "retry_interval",
+ ]
+
+ EntryType = namedtuple("DestinationsEntry", fields)
\ No newline at end of file
|