From 921913935176f5bd3df5e5b960d87c94a2adb304 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Aug 2016 15:23:39 +0100 Subject: Preserve some logcontexts --- synapse/storage/events.py | 16 ++++++++++------ synapse/storage/stream.py | 6 +++--- 2 files changed, 13 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 97aef25321..57e5005285 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes @@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 862c5c3ea1..0577a0525b 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield defer.gatherResults([ + res = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ]) + ])) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) -- cgit 1.4.1