diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b3c3bf55bc..289c117396 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -165,7 +165,7 @@ class TransactionStore(SQLBaseStore):
txn,
table="destinations",
keyvalues={"destination": destination},
- retcols=("destination", "retry_last_ts", "retry_interval"),
+ retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)
@@ -174,12 +174,15 @@ class TransactionStore(SQLBaseStore):
else:
return None
- def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval):
+ def set_destination_retry_timings(
+ self, destination, failure_ts, retry_last_ts, retry_interval
+ ):
"""Sets the current retry timings for a given destination.
Both timings should be zero if retrying is no longer occuring.
Args:
destination (str)
+ failure_ts (int|None) - when the server started failing (ms since epoch)
retry_last_ts (int) - time of last retry attempt in unix epoch ms
retry_interval (int) - how long until next retry in ms
"""
@@ -189,12 +192,13 @@ class TransactionStore(SQLBaseStore):
"set_destination_retry_timings",
self._set_destination_retry_timings,
destination,
+ failure_ts,
retry_last_ts,
retry_interval,
)
def _set_destination_retry_timings(
- self, txn, destination, retry_last_ts, retry_interval
+ self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
if self.database_engine.can_native_upsert:
@@ -202,9 +206,12 @@ class TransactionStore(SQLBaseStore):
# resetting it) or greater than the existing retry interval.
sql = """
- INSERT INTO destinations (destination, retry_last_ts, retry_interval)
- VALUES (?, ?, ?)
+ INSERT INTO destinations (
+ destination, failure_ts, retry_last_ts, retry_interval
+ )
+ VALUES (?, ?, ?, ?)
ON CONFLICT (destination) DO UPDATE SET
+ failure_ts = EXCLUDED.failure_ts,
retry_last_ts = EXCLUDED.retry_last_ts,
retry_interval = EXCLUDED.retry_interval
WHERE
@@ -212,7 +219,7 @@ class TransactionStore(SQLBaseStore):
OR destinations.retry_interval < EXCLUDED.retry_interval
"""
- txn.execute(sql, (destination, retry_last_ts, retry_interval))
+ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
return
@@ -225,7 +232,7 @@ class TransactionStore(SQLBaseStore):
txn,
table="destinations",
keyvalues={"destination": destination},
- retcols=("retry_last_ts", "retry_interval"),
+ retcols=("failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)
@@ -235,6 +242,7 @@ class TransactionStore(SQLBaseStore):
table="destinations",
values={
"destination": destination,
+ "failure_ts": failure_ts,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
@@ -245,31 +253,12 @@ class TransactionStore(SQLBaseStore):
"destinations",
keyvalues={"destination": destination},
updatevalues={
+ "failure_ts": failure_ts,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
)
- def get_destinations_needing_retry(self):
- """Get all destinations which are due a retry for sending a transaction.
-
- Returns:
- list: A list of dicts
- """
-
- return self.runInteraction(
- "get_destinations_needing_retry", self._get_destinations_needing_retry
- )
-
- def _get_destinations_needing_retry(self, txn):
- query = (
- "SELECT * FROM destinations"
- " WHERE retry_last_ts > 0 and retry_next_ts < ?"
- )
-
- txn.execute(query, (self._clock.time_msec(),))
- return self.cursor_to_dict(txn)
-
def _start_cleanup_transactions(self):
return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
|