diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index adab520c78..ee2efb0d36 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -14,6 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
@@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore):
def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
- txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
+ self.database_engine.lock_table(txn, "destinations")
- self._simple_upsert_txn(
+ self._invalidate_cache_and_stream(
+ txn, self.get_destination_retry_timings, (destination,)
+ )
+
+ # We need to be careful here as the data may have changed from under us
+ # due to a worker setting the timings.
+
+ prev_row = self._simple_select_one_txn(
txn,
- "destinations",
+ table="destinations",
keyvalues={
"destination": destination,
},
- values={
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- },
- insertion_values={
- "destination": destination,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- }
+ retcols=("retry_last_ts", "retry_interval"),
+ allow_none=True,
)
+ if not prev_row:
+ self._simple_insert_txn(
+ txn,
+ table="destinations",
+ values={
+ "destination": destination,
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ }
+ )
+ elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+ self._simple_update_one_txn(
+ txn,
+ "destinations",
+ keyvalues={
+ "destination": destination,
+ },
+ updatevalues={
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ },
+ )
+
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
|