diff options
Diffstat (limited to 'synapse/storage/transactions.py')
-rw-r--r-- | synapse/storage/transactions.py | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 2b16787695..423cc3f02a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - + # a write-through cache of DestinationsTable.EntryType indexed by # destination string destination_retry_cache = {} @@ -213,21 +213,21 @@ class TransactionStore(SQLBaseStore): def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. - + Args: destination (str) - + Returns: None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ if destination in self.destination_retry_cache: return defer.succeed(self.destination_retry_cache[destination]) - + return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - + def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) @@ -238,30 +238,36 @@ class TransactionStore(SQLBaseStore): return result else: return None - + def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. - + Args: destination (str) retry_last_ts (int) - time of last retry attempt in unix epoch ms retry_interval (int) - how long until next retry in ms """ - + self.destination_retry_cache[destination] = ( - DestinationsTable.EntryType(destination, - retry_last_ts, retry_interval) + DestinationsTable.EntryType( + destination, + retry_last_ts, + retry_interval + ) ) - + # XXX: we could chose to not bother persisting this if our cache thinks # this is a NOOP return self.runInteraction( "set_destination_retry_timings", - self._set_destination_retry_timings, destination, - retry_last_ts, retry_interval) - + self._set_destination_retry_timings, + destination, + retry_last_ts, + retry_interval, + ) + def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): @@ -275,21 +281,22 @@ class TransactionStore(SQLBaseStore): def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. - + Returns: list: A list of `DestinationsTable.EntryType` """ - + return self.runInteraction( "get_destinations_needing_retry", self._get_destinations_needing_retry ) - + def _get_destinations_needing_retry(cls, txn): where = "retry_last_ts > 0 and retry_next_ts < now()" query = DestinationsTable.select_statement(where) txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + return DestinationsTable.decode_results(txn.fetchall()) + class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -332,14 +339,15 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) - + + class DestinationsTable(Table): table_name = "destinations" - + fields = [ "destination", "retry_last_ts", "retry_interval", ] - EntryType = namedtuple("DestinationsEntry", fields) \ No newline at end of file + EntryType = namedtuple("DestinationsEntry", fields) |