diff options
author | Richard van der Hoff <richard@matrix.org> | 2018-02-03 22:40:28 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2018-02-03 22:40:28 +0000 |
commit | bb9f0f3cdb57882248b6426dd4ec5bd5781daaef (patch) | |
tree | 41a8ee0b215406119ec5ef44cf0a3326a8ae1636 /synapse/storage/events.py | |
parent | oops (diff) | |
parent | Merge pull request #2837 from matrix-org/rav/fix_quarantine_media (diff) | |
download | synapse-bb9f0f3cdb57882248b6426dd4ec5bd5781daaef.tar.xz |
Merge branch 'develop' into matthew/gin_work_mem
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d08f7571d7..33fccfa7a8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,7 +27,7 @@ from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -from synapse.state import resolve_events +from synapse.state import resolve_events_with_factory from synapse.util.caches.descriptors import cached from synapse.types import get_domain_from_id @@ -110,7 +110,7 @@ class _EventPeristenceQueue(object): end_item.events_and_contexts.extend(events_and_contexts) return end_item.deferred.observe() - deferred = ObservableDeferred(defer.Deferred()) + deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) queue.append(self._EventPersistQueueItem( events_and_contexts=events_and_contexts, @@ -146,18 +146,25 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: + # handle_queue_loop runs in the sentinel logcontext, so + # there is no need to preserve_fn when running the + # callbacks on the deferred. try: ret = yield per_item_callback(item) item.deferred.callback(ret) - except Exception as e: - item.deferred.errback(e) + except Exception: + item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - preserve_fn(handle_queue_loop)() + # set handle_queue_loop off on the background. We don't want to + # attribute work done in it to the current request, so we drop the + # logcontext altogether. + with PreserveLoggingContext(): + handle_queue_loop() def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) @@ -528,6 +535,12 @@ class EventsStore(SQLBaseStore): # the events we have yet to persist, so we need a slightly more # complicated event lookup function than simply looking the events # up in the db. + + logger.info( + "Resolving state for %s with %i state sets", + room_id, len(state_sets), + ) + events_map = {ev.event_id: ev for ev, _ in events_context} @defer.inlineCallbacks @@ -550,7 +563,7 @@ class EventsStore(SQLBaseStore): to_return.update(evs) defer.returnValue(to_return) - current_state = yield resolve_events( + current_state = yield resolve_events_with_factory( state_sets, state_map_factory=get_events, ) |