From 90565d015e97a494f516cc6f06596ca5c6d490ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Nov 2016 17:45:44 +0000 Subject: Invalidate retry cache in both directions --- synapse/storage/transactions.py | 48 ++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 12 deletions(-) (limited to 'synapse/storage/transactions.py') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..ee2efb0d36 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import cached from twisted.internet import defer @@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. -- cgit 1.4.1 From 54fed21c049ba89d71242e8c8fc0133fe703395c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Nov 2016 18:18:31 +0000 Subject: Fix tests and flake8 --- synapse/storage/transactions.py | 1 - tests/utils.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/transactions.py') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ee2efb0d36..809fdd311f 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,7 +14,6 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import cached from twisted.internet import defer diff --git a/tests/utils.py b/tests/utils.py index bf6449a0fc..ab2252d24c 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -53,6 +53,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.trusted_third_party_id_servers = [] config.room_invite_state_types = [] config.password_providers = [] + config.worker_replication_url = "" config.use_frozen_dicts = True config.database_config = {"name": "sqlite3"} -- cgit 1.4.1