diff options
author | Erik Johnston <erik@matrix.org> | 2016-08-24 14:28:57 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-08-24 14:28:57 +0100 |
commit | 90d5983d7afa89a4431fea053512190ae83c8f3a (patch) | |
tree | 6c86dd3497030f7e9b629264e210e05a6ea1ce76 /synapse/storage | |
parent | Bump changelog and version (diff) | |
parent | Merge pull request #1042 from matrix-org/erikj/preserve_log_contexts (diff) | |
download | synapse-90d5983d7afa89a4431fea053512190ae83c8f3a.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into release-v0.17.1
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/events.py | 16 | ||||
-rw-r--r-- | synapse/storage/schema/delta/34/received_txn_purge.py | 32 | ||||
-rw-r--r-- | synapse/storage/stream.py | 6 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 17 |
4 files changed, 58 insertions, 13 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 97aef25321..57e5005285 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes @@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py new file mode 100644 index 0000000000..033144341c --- /dev/null +++ b/synapse/storage/schema/delta/34/received_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE received_transactions") + else: + cur.execute("DELETE FROM received_transactions") + + cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 862c5c3ea1..0577a0525b 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield defer.gatherResults([ + res = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ]) + ])) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6258ff1725..58d4de4f1d 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -62,10 +62,9 @@ class TransactionStore(SQLBaseStore): self.last_transaction = {} reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - hs.get_clock().looping_call( - self._persist_in_mem_txns, - 1000, - ) + self._clock.looping_call(self._persist_in_mem_txns, 1000) + + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -127,6 +126,7 @@ class TransactionStore(SQLBaseStore): "origin": origin, "response_code": code, "response_json": buffer(encode_canonical_json(response_dict)), + "ts": self._clock.time_msec(), }, or_ignore=True, desc="set_received_txn_response", @@ -383,3 +383,12 @@ class TransactionStore(SQLBaseStore): yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") + + def _cleanup_transactions(self): + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) |