summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2014-12-07 02:26:07 +0000
committerMatthew Hodgson <matthew@matrix.org>2014-12-07 02:26:07 +0000
commitaed62a35832a3ec1c7425ecc99cab06a781263ba (patch)
tree6dd8bdeb4f79084a084f24f6747c1e1d00fcf973 /synapse/storage
parentPull in latest matrix-angular_sdk (diff)
downloadsynapse-aed62a35832a3ec1c7425ecc99cab06a781263ba.tar.xz
track replication destination health, and perform exponential back-off when sending transactions. does *not* yet retry transactions, but drops them on the floor if waiting for a server to recover.
Diffstat (limited to '')
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/schema/delta/v9.sql23
-rw-r--r--synapse/storage/schema/transactions.sql6
-rw-r--r--synapse/storage/transactions.py78
5 files changed, 108 insertions, 3 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f15e3dfe62..04ab39341d 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -67,7 +67,7 @@ SCHEMAS = [
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
 
 
 class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e72200e2f7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -650,7 +650,7 @@ class JoinHelper(object):
     to dump the results into.
 
     Attributes:
-        taples (list): List of `Table` classes
+        tables (list): List of `Table` classes
         EntryType (type)
     """
 
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..ad680c64da
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,23 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
+
+PRAGMA user_version = 9;
\ No newline at end of file
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
 
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts INTEGER,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..47b73f7458 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -114,7 +114,7 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
-        # First we find out what the prev_txs should be.
+        # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
         query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +205,71 @@ class TransactionStore(SQLBaseStore):
 
         return ReceivedTransactionsTable.decode_results(txn.fetchall())
 
+    def get_destination_retry_timings(self, destination):
+        """Gets the current retry timings (if any) for a given destination.
+        
+        Args:
+            destination (str)
+        
+        Returns:
+            None if not retrying
+            tuple: (retry_last_ts, retry_interval)
+                retry_ts: time of last retry attempt in unix epoch ms
+                retry_interval: how long until next retry in ms
+        """
+        return self.runInteraction(
+            "get_destination_retry_timings",
+            self._get_destination_retry_timings, destination)
+            
+    def _get_destination_retry_timings(cls, txn, destination):
+        query = DestinationsTable.select_statement("destination = ?")
+        txn.execute(query, (destination,))
+        result = DestinationsTable.decode_single_result(txn.fetchone())
+        if result and result[0] > 0:
+            return result
+        else:
+            return None
+        
+    def set_destination_retry_timings(self, destination):
+        """Sets the current retry timings for a given destination.
+        Both timings should be zero if retrying is no longer occuring.
+        
+        Args:
+            destination (str)
+            retry_last_ts (int) - time of last retry attempt in unix epoch ms
+            retry_interval (int) - how long until next retry in ms
+        """
+        return self.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
+            
+    def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval):
+
+        query = (
+            "INSERT OR REPLACE INTO %s "
+            "(retry_last_ts, retry_interval) "
+            "VALUES (?, ?) "
+            "WHERE destination = ?"
+        ) % DestinationsTable.table_name
+
+        txn.execute(query, (retry_last_ts, retry_interval, destination))
+
+    def get_destinations_needing_retry(self):
+        """Get all destinations which are due a retry for sending a transaction.
+                    
+        Returns:
+            list: A list of `DestinationsTable.EntryType`
+        """
+        return self.runInteraction(
+            "get_destinations_needing_retry",
+            self._get_destinations_needing_retry
+        )
+        
+    def _get_destinations_needing_retry(cls, txn):
+        where = "retry_last_ts > 0 and retry_next_ts < now()"
+        query = DestinationsTable.select_statement(where)
+        txn.execute(query)
+        return DestinationsTable.decode_results(txn.fetchall())    
 
 class ReceivedTransactionsTable(Table):
     table_name = "received_transactions"
@@ -247,3 +312,14 @@ class TransactionsToPduTable(Table):
     ]
 
     EntryType = namedtuple("TransactionsToPduEntry", fields)
+    
+class DestinationsTable(Table):
+    table_name = "destinations"
+    
+    fields = [
+        "destination",
+        "retry_last_ts",
+        "retry_interval",
+    ]
+
+    EntryType = namedtuple("DestinationsEntry", fields)
\ No newline at end of file