diff options
author | Erik Johnston <erikj@jki.re> | 2016-11-23 15:39:12 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-11-23 15:39:12 +0000 |
commit | 302fbd218ddb00a1d38e778b29cdf043800941e1 (patch) | |
tree | 89e462d2fc8d2470bb66d744e2289050df7edef3 /synapse/storage/transactions.py | |
parent | Merge pull request #1641 from matrix-org/erikj/as_pushers (diff) | |
parent | Shuffle receipt handler around so that worker apps don't need to load it (diff) | |
download | synapse-302fbd218ddb00a1d38e778b29cdf043800941e1.tar.xz |
Merge pull request #1635 from matrix-org/erikj/split_out_fed_txn
Split out federation transaction sending to a worker
Diffstat (limited to 'synapse/storage/transactions.py')
-rw-r--r-- | synapse/storage/transactions.py | 47 |
1 files changed, 35 insertions, 12 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..809fdd311f 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. |