diff options
author | Erik Johnston <erikj@jki.re> | 2016-08-23 11:02:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-23 11:02:40 +0100 |
commit | 122c7a43c90f0e2cee5b4bf2d169b405184746d0 (patch) | |
tree | a9636f6aba8d656f0d6f761970fce9a3643a9099 | |
parent | Merge pull request #1028 from matrix-org/dbkr/notifications_api (diff) | |
parent | Delete old received_transactions (diff) | |
download | synapse-122c7a43c90f0e2cee5b4bf2d169b405184746d0.tar.xz |
Merge pull request #1038 from matrix-org/erikj/receved_txn_purge
Delete old received_transactions rows
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/schema/delta/34/received_txn_purge.py | 32 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 17 |
2 files changed, 45 insertions, 4 deletions
diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py new file mode 100644 index 0000000000..033144341c --- /dev/null +++ b/synapse/storage/schema/delta/34/received_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE received_transactions") + else: + cur.execute("DELETE FROM received_transactions") + + cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6258ff1725..58d4de4f1d 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -62,10 +62,9 @@ class TransactionStore(SQLBaseStore): self.last_transaction = {} reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - hs.get_clock().looping_call( - self._persist_in_mem_txns, - 1000, - ) + self._clock.looping_call(self._persist_in_mem_txns, 1000) + + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -127,6 +126,7 @@ class TransactionStore(SQLBaseStore): "origin": origin, "response_code": code, "response_json": buffer(encode_canonical_json(response_dict)), + "ts": self._clock.time_msec(), }, or_ignore=True, desc="set_received_txn_response", @@ -383,3 +383,12 @@ class TransactionStore(SQLBaseStore): yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") + + def _cleanup_transactions(self): + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) |