summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/transactions.py165
1 files changed, 121 insertions, 44 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 
+from twisted.internet import defer, reactor
+
 from canonicaljson import encode_canonical_json
+
+from collections import namedtuple
+
+import itertools
 import logging
 
 logger = logging.getLogger(__name__)
 
 
+_TransactionRow = namedtuple(
+    "_TransactionRow", (
+        "id", "transaction_id", "destination", "ts", "response_code",
+        "response_json",
+    )
+)
+
+_UpdateTransactionRow = namedtuple(
+    "_TransactionRow", (
+        "response_code", "response_json",
+    )
+)
+
+
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
 
+    def __init__(self, hs):
+        super(TransactionStore, self).__init__(hs)
+
+        # New transactions that are currently in flights
+        self.inflight_transactions = {}
+
+        # Newly delievered transactions that *weren't* persisted while in flight
+        self.new_delivered_transactions = {}
+
+        # Newly delivered transactions that *were* persisted while in flight
+        self.update_delivered_transactions = {}
+
+        self.last_transaction = {}
+
+        reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
+        hs.get_clock().looping_call(
+            self._persist_in_mem_txns,
+            1000,
+        )
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore):
             list: A list of previous transaction ids.
         """
 
-        return self.runInteraction(
-            "prep_send_transaction",
-            self._prep_send_transaction,
-            transaction_id, destination, origin_server_ts
+        auto_id = self._transaction_id_gen.get_next()
+
+        txn_row = _TransactionRow(
+            id=auto_id,
+            transaction_id=transaction_id,
+            destination=destination,
+            ts=origin_server_ts,
+            response_code=0,
+            response_json=None,
         )
 
-    def _prep_send_transaction(self, txn, transaction_id, destination,
-                               origin_server_ts):
+        self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
 
-        next_id = self._transaction_id_gen.get_next()
+        prev_txn = self.last_transaction.get(destination)
+        if prev_txn:
+            return defer.succeed(prev_txn)
+        else:
+            return self.runInteraction(
+                "_get_prevs_txn",
+                self._get_prevs_txn,
+                destination,
+            )
 
+    def _get_prevs_txn(self, txn, destination):
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
@@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore):
 
         prev_txns = [r["transaction_id"] for r in results]
 
-        # Actually add the new transaction to the sent_transactions table.
-
-        self._simple_insert_txn(
-            txn,
-            table="sent_transactions",
-            values={
-                "id": next_id,
-                "transaction_id": transaction_id,
-                "destination": destination,
-                "ts": origin_server_ts,
-                "response_code": 0,
-                "response_json": None,
-            }
-        )
-
-        # TODO Update the tx id -> pdu id mapping
-
         return prev_txns
 
     def delivered_txn(self, transaction_id, destination, code, response_dict):
@@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore):
             code (int)
             response_json (str)
         """
-        return self.runInteraction(
-            "delivered_txn",
-            self._delivered_txn,
-            transaction_id, destination, code,
-            buffer(encode_canonical_json(response_dict)),
-        )
 
-    def _delivered_txn(self, txn, transaction_id, destination,
-                       code, response_json):
-        self._simple_update_one_txn(
-            txn,
-            table="sent_transactions",
-            keyvalues={
-                "transaction_id": transaction_id,
-                "destination": destination,
-            },
-            updatevalues={
-                "response_code": code,
-                "response_json": None,  # For now, don't persist response_json
-            }
-        )
+        txn_row = self.inflight_transactions.get(
+            destination, {}
+        ).pop(transaction_id, None)
+
+        self.last_transaction[destination] = transaction_id
+
+        if txn_row:
+            d = self.new_delivered_transactions.setdefault(destination, {})
+            d[transaction_id] = txn_row._replace(
+                response_code=code,
+                response_json=None,  # For now, don't persist response
+            )
+        else:
+            d = self.update_delivered_transactions.setdefault(destination, {})
+            # For now, don't persist response
+            d[transaction_id] = _UpdateTransactionRow(code, None)
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore):
 
         txn.execute(query, (self._clock.time_msec(),))
         return self.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def _persist_in_mem_txns(self):
+        try:
+            inflight = self.inflight_transactions
+            new_delivered = self.new_delivered_transactions
+            update_delivered = self.update_delivered_transactions
+
+            self.inflight_transactions = {}
+            self.new_delivered_transactions = {}
+            self.update_delivered_transactions = {}
+
+            full_rows = [
+                row._asdict()
+                for txn_map in itertools.chain(inflight.values(), new_delivered.values())
+                for row in txn_map.values()
+            ]
+
+            def f(txn):
+                if full_rows:
+                    self._simple_insert_many_txn(
+                        txn=txn,
+                        table="sent_transactions",
+                        values=full_rows
+                    )
+
+                for dest, txn_map in update_delivered.items():
+                    for txn_id, update_row in txn_map.items():
+                        self._simple_update_one_txn(
+                            txn,
+                            table="sent_transactions",
+                            keyvalues={
+                                "transaction_id": txn_id,
+                                "destination": dest,
+                            },
+                            updatevalues={
+                                "response_code": update_row.response_code,
+                                "response_json": None,  # For now, don't persist response
+                            }
+                        )
+
+            if full_rows or update_delivered:
+                yield self.runInteraction("_persist_in_mem_txns", f)
+        except:
+            logger.exception("Failed to persist transactions!")