From 56b5e83e36f22f3eab1ebf7a46b9f23f0c1a3e8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:20:18 +0100 Subject: Reduce database inserts when sending transactions --- synapse/handlers/presence.py | 2 +- synapse/storage/transactions.py | 157 +++++++++++++++++++++++++++++----------- 2 files changed, 114 insertions(+), 45 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328b..639567953a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30 * 1000, self.clock.looping_call, self._handle_timeouts, 5000, diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0a..17fc601983 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,54 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +146,28 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row + + # TODO: Fetch prev_txns - next_id = self._transaction_id_gen.get_next() + return self.runInteraction( + "prep_send_transaction", + self._get_prevs_txn, + destination, + ) + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +182,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +193,21 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +331,46 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") -- cgit 1.4.1 From 1d275dba69272f6df5a79871dc4b8d31e71514d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:25:58 +0100 Subject: Don't needlessly enter transaction --- synapse/storage/transactions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 17fc601983..ba1969b243 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -159,10 +159,8 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - # TODO: Fetch prev_txns - return self.runInteraction( - "prep_send_transaction", + "_get_prevs_txn", self._get_prevs_txn, destination, ) @@ -350,11 +348,12 @@ class TransactionStore(SQLBaseStore): ] def f(txn): - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) for dest, txn_map in update_delivered.items(): for txn_id, update_row in txn_map.items(): @@ -371,6 +370,7 @@ class TransactionStore(SQLBaseStore): } ) - yield self.runInteraction("_persist_in_mem_txns", f) + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") -- cgit 1.4.1 From d13459636fab9637a43c950d17f883ca77b832f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:30:55 +0100 Subject: Pull prev txn from in memory --- synapse/storage/transactions.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ba1969b243..6c7481a728 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -58,6 +58,8 @@ class TransactionStore(SQLBaseStore): # Newly delivered transactions that *were* persisted while in flight self.update_delivered_transactions = {} + self.last_transaction = {} + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) hs.get_clock().looping_call( self._persist_in_mem_txns, @@ -159,11 +161,15 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. @@ -196,6 +202,8 @@ class TransactionStore(SQLBaseStore): destination, {} ).pop(transaction_id, None) + self.last_transaction[destination] = transaction_id + if txn_row: d = self.new_delivered_transactions.setdefault(destination, {}) d[transaction_id] = txn_row._replace( -- cgit 1.4.1