diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-22 17:45:44 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-22 17:45:44 +0000 |
commit | 90565d015e97a494f516cc6f06596ca5c6d490ec (patch) | |
tree | 77e37de19c6476bf5b9312168b06e74347e005db /synapse/storage/transactions.py | |
parent | Comments (diff) | |
download | synapse-90565d015e97a494f516cc6f06596ca5c6d490ec.tar.xz |
Invalidate retry cache in both directions
Diffstat (limited to 'synapse/storage/transactions.py')
-rw-r--r-- | synapse/storage/transactions.py | 48 |
1 files changed, 36 insertions, 12 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..ee2efb0d36 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import cached from twisted.internet import defer @@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. |