diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/transactions.py | 166 |
1 files changed, 3 insertions, 163 deletions
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 232ccb96e1..59ece7b811 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,13 +16,12 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached -from twisted.internet import defer, reactor +from twisted.internet import defer from canonicaljson import encode_canonical_json from collections import namedtuple -import itertools import logging import ujson as json @@ -47,25 +46,6 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - def __init__(self, hs): - super(TransactionStore, self).__init__(hs) - - # New transactions that are currently in flights - self.inflight_transactions = {} - - # Newly delievered transactions that *weren't* persisted while in flight - self.new_delivered_transactions = {} - - # Newly delivered transactions that *were* persisted while in flight - self.update_delivered_transactions = {} - - self.last_transaction = {} - - reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - self._clock.looping_call(self._persist_in_mem_txns, 10 * 1000) - - self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) - def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -148,46 +128,7 @@ class TransactionStore(SQLBaseStore): Returns: list: A list of previous transaction ids. """ - - auto_id = self._transaction_id_gen.get_next() - - txn_row = _TransactionRow( - id=auto_id, - transaction_id=transaction_id, - destination=destination, - ts=origin_server_ts, - response_code=0, - response_json=None, - ) - - self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - - prev_txn = self.last_transaction.get(destination) - if prev_txn: - return defer.succeed(prev_txn) - else: - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) - - def _get_prevs_txn(self, txn, destination): - # First we find out what the prev_txns should be. - # Since we know that we are only sending one transaction at a time, - # we can simply take the last one. - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ?" - " ORDER BY id DESC LIMIT 1" - ) - - txn.execute(query, (destination,)) - results = self.cursor_to_dict(txn) - - prev_txns = [r["transaction_id"] for r in results] - - return prev_txns + return defer.succeed([]) def delivered_txn(self, transaction_id, destination, code, response_dict): """Persists the response for an outgoing transaction. @@ -198,52 +139,7 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - - txn_row = self.inflight_transactions.get( - destination, {} - ).pop(transaction_id, None) - - self.last_transaction[destination] = transaction_id - - if txn_row: - d = self.new_delivered_transactions.setdefault(destination, {}) - d[transaction_id] = txn_row._replace( - response_code=code, - response_json=None, # For now, don't persist response - ) - else: - d = self.update_delivered_transactions.setdefault(destination, {}) - # For now, don't persist response - d[transaction_id] = _UpdateTransactionRow(code, None) - - def get_transactions_after(self, transaction_id, destination): - """Get all transactions after a given local transaction_id. - - Args: - transaction_id (str) - destination (str) - - Returns: - list: A list of dicts - """ - return self.runInteraction( - "get_transactions_after", - self._get_transactions_after, transaction_id, destination - ) - - def _get_transactions_after(self, txn, transaction_id, destination): - query = ( - "SELECT * FROM sent_transactions" - " WHERE destination = ? AND id >" - " (" - " SELECT id FROM sent_transactions" - " WHERE transaction_id = ? AND destination = ?" - " )" - ) - - txn.execute(query, (destination, transaction_id, destination)) - - return self.cursor_to_dict(txn) + pass @cached(max_entries=10000) def get_destination_retry_timings(self, destination): @@ -338,59 +234,3 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) - - @defer.inlineCallbacks - def _persist_in_mem_txns(self): - try: - inflight = self.inflight_transactions - new_delivered = self.new_delivered_transactions - update_delivered = self.update_delivered_transactions - - self.inflight_transactions = {} - self.new_delivered_transactions = {} - self.update_delivered_transactions = {} - - full_rows = [ - row._asdict() - for txn_map in itertools.chain(inflight.values(), new_delivered.values()) - for row in txn_map.values() - ] - - def f(txn): - if full_rows: - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) - - for dest, txn_map in update_delivered.items(): - for txn_id, update_row in txn_map.items(): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": txn_id, - "destination": dest, - }, - updatevalues={ - "response_code": update_row.response_code, - "response_json": None, # For now, don't persist response - } - ) - - if full_rows or update_delivered: - yield self.runInteraction("_persist_in_mem_txns", f) - except: - logger.exception("Failed to persist transactions!") - - def _cleanup_transactions(self): - now = self._clock.time_msec() - month_ago = now - 30 * 24 * 60 * 60 * 1000 - six_hours_ago = now - 6 * 60 * 60 * 1000 - - def _cleanup_transactions_txn(txn): - txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) - txn.execute("DELETE FROM sent_transactions WHERE ts < ?", (six_hours_ago,)) - - return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) |