summary refs log tree commit diff
path: root/synapse/storage/transactions.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-11-22 17:45:44 +0000
committerErik Johnston <erik@matrix.org>2016-11-22 17:45:44 +0000
commit90565d015e97a494f516cc6f06596ca5c6d490ec (patch)
tree77e37de19c6476bf5b9312168b06e74347e005db /synapse/storage/transactions.py
parentComments (diff)
downloadsynapse-90565d015e97a494f516cc6f06596ca5c6d490ec.tar.xz
Invalidate retry cache in both directions
Diffstat (limited to 'synapse/storage/transactions.py')
-rw-r--r--synapse/storage/transactions.py48
1 files changed, 36 insertions, 12 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index adab520c78..ee2efb0d36 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
+from synapse.storage.engines import PostgresEngine
 from synapse.util.caches.descriptors import cached
 
 from twisted.internet import defer
@@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore):
 
     def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-        txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
+        self.database_engine.lock_table(txn, "destinations")
 
-        self._simple_upsert_txn(
+        self._invalidate_cache_and_stream(
+            txn, self.get_destination_retry_timings, (destination,)
+        )
+
+        # We need to be careful here as the data may have changed from under us
+        # due to a worker setting the timings.
+
+        prev_row = self._simple_select_one_txn(
             txn,
-            "destinations",
+            table="destinations",
             keyvalues={
                 "destination": destination,
             },
-            values={
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval,
-            },
-            insertion_values={
-                "destination": destination,
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval,
-            }
+            retcols=("retry_last_ts", "retry_interval"),
+            allow_none=True,
         )
 
+        if not prev_row:
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
+        elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+            self._simple_update_one_txn(
+                txn,
+                "destinations",
+                keyvalues={
+                    "destination": destination,
+                },
+                updatevalues={
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                },
+            )
+
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.