diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e4bdad98bb..c9ab434b4e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -69,7 +69,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e72200e2f7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -650,7 +650,7 @@ class JoinHelper(object):
to dump the results into.
Attributes:
- taples (list): List of `Table` classes
+ tables (list): List of `Table` classes
EntryType (type)
"""
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..ad680c64da
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,23 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
+
+PRAGMA user_version = 9;
\ No newline at end of file
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
from collections import namedtuple
+from twisted.internet import defer
+
import logging
logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ # a write-through cache of DestinationsTable.EntryType indexed by
+ # destination string
+ destination_retry_cache = {}
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
- # First we find out what the prev_txs should be.
+ # First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ def get_destination_retry_timings(self, destination):
+ """Gets the current retry timings (if any) for a given destination.
+
+ Args:
+ destination (str)
+
+ Returns:
+ None if not retrying
+ Otherwise a DestinationsTable.EntryType for the retry scheme
+ """
+ if destination in self.destination_retry_cache:
+ return defer.succeed(self.destination_retry_cache[destination])
+
+ return self.runInteraction(
+ "get_destination_retry_timings",
+ self._get_destination_retry_timings, destination)
+
+ def _get_destination_retry_timings(cls, txn, destination):
+ query = DestinationsTable.select_statement("destination = ?")
+ txn.execute(query, (destination,))
+ result = txn.fetchall()
+ if result:
+ result = DestinationsTable.decode_single_result(result)
+ if result.retry_last_ts > 0:
+ return result
+ else:
+ return None
+
+ def set_destination_retry_timings(self, destination,
+ retry_last_ts, retry_interval):
+ """Sets the current retry timings for a given destination.
+ Both timings should be zero if retrying is no longer occuring.
+
+ Args:
+ destination (str)
+ retry_last_ts (int) - time of last retry attempt in unix epoch ms
+ retry_interval (int) - how long until next retry in ms
+ """
+
+ self.destination_retry_cache[destination] = (
+ DestinationsTable.EntryType(
+ destination,
+ retry_last_ts,
+ retry_interval
+ )
+ )
+
+ # XXX: we could chose to not bother persisting this if our cache thinks
+ # this is a NOOP
+ return self.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings,
+ destination,
+ retry_last_ts,
+ retry_interval,
+ )
+
+ def _set_destination_retry_timings(cls, txn, destination,
+ retry_last_ts, retry_interval):
+
+ query = (
+ "INSERT OR REPLACE INTO %s "
+ "(destination, retry_last_ts, retry_interval) "
+ "VALUES (?, ?, ?) "
+ ) % DestinationsTable.table_name
+
+ txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+ def get_destinations_needing_retry(self):
+ """Get all destinations which are due a retry for sending a transaction.
+
+ Returns:
+ list: A list of `DestinationsTable.EntryType`
+ """
+
+ return self.runInteraction(
+ "get_destinations_needing_retry",
+ self._get_destinations_needing_retry
+ )
+
+ def _get_destinations_needing_retry(cls, txn):
+ where = "retry_last_ts > 0 and retry_next_ts < now()"
+ query = DestinationsTable.select_statement(where)
+ txn.execute(query)
+ return DestinationsTable.decode_results(txn.fetchall())
+
class ReceivedTransactionsTable(Table):
table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
]
EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+ table_name = "destinations"
+
+ fields = [
+ "destination",
+ "retry_last_ts",
+ "retry_interval",
+ ]
+
+ EntryType = namedtuple("DestinationsEntry", fields)
|