From 64ddec1bc0a1d23a285d560e34986441b3f8c854 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 16 Jan 2018 11:47:36 +0000 Subject: Fix a logcontext leak in persist_events ObserveableDeferred expects its callbacks to be called without any logcontexts, whereas it turns out we were calling them with the logcontext of the request which initiated the persistence loop. It seems wrong that we are attributing work done in the persistence loop to the request that happened to initiate it, so let's solve this by dropping the logcontext for it. (I'm not sure this actually causes any real problems other than messages in the debug log, but let's clean it up anyway) --- synapse/storage/events.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d08f7571d7..ad1d782705 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -146,6 +146,9 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: + # handle_queue_loop runs in the sentinel logcontext, so + # there is no need to preserve_fn when running the + # callbacks on the deferred. try: ret = yield per_item_callback(item) item.deferred.callback(ret) @@ -157,7 +160,11 @@ class _EventPeristenceQueue(object): self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - preserve_fn(handle_queue_loop)() + # set handle_queue_loop off on the background. We don't want to + # attribute work done in it to the current request, so we drop the + # logcontext altogether. + with PreserveLoggingContext(): + handle_queue_loop() def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) -- cgit 1.4.1 From 390093d45e1951b1a1d8a034667d2e84b3bf064d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 17 Jan 2018 15:44:31 +0000 Subject: Split resolve_events into two functions ... so that the return type doesn't depend on the arg types --- synapse/state.py | 45 +++++++++++++++++++++++++++------------------ synapse/storage/events.py | 4 ++-- 2 files changed, 29 insertions(+), 20 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/state.py b/synapse/state.py index 9e624b4937..1f9abf9d3d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -341,7 +341,7 @@ class StateHandler(object): if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): - new_state = yield resolve_events( + new_state = yield resolve_events_with_factory( state_groups_ids.values(), state_map_factory=lambda ev_ids: self.store.get_events( ev_ids, get_prev_content=False, check_redacted=False, @@ -404,7 +404,7 @@ class StateHandler(object): } with Measure(self.clock, "state._resolve_events"): - new_state = resolve_events(state_set_ids, state_map) + new_state = resolve_events_with_state_map(state_set_ids, state_map) new_state = { key: state_map[ev_id] for key, ev_id in new_state.items() @@ -420,19 +420,17 @@ def _ordered_events(events): return sorted(events, key=key_func) -def resolve_events(state_sets, state_map_factory): +def resolve_events_with_state_map(state_sets, state_map): """ Args: state_sets(list): List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - state_map_factory(dict|callable): If callable, then will be called - with a list of event_ids that are needed, and should return with - a Deferred of dict of event_id to event. Otherwise, should be - a dict from event_id to event of all events in state_sets. + state_map(dict): a dict from event_id to event, for all events in + state_sets. Returns - dict[(str, str), synapse.events.FrozenEvent] is a map from - (type, state_key) to event. + dict[(str, str), synapse.events.FrozenEvent]: + a map from (type, state_key) to event. """ if len(state_sets) == 1: return state_sets[0] @@ -441,13 +439,6 @@ def resolve_events(state_sets, state_map_factory): state_sets, ) - if callable(state_map_factory): - return _resolve_with_state_fac( - unconflicted_state, conflicted_state, state_map_factory - ) - - state_map = state_map_factory - auth_events = _create_auth_events_from_maps( unconflicted_state, conflicted_state, state_map ) @@ -491,8 +482,26 @@ def _seperate(state_sets): @defer.inlineCallbacks -def _resolve_with_state_fac(unconflicted_state, conflicted_state, - state_map_factory): +def resolve_events_with_factory(state_sets, state_map_factory): + """ + Args: + state_sets(list): List of dicts of (type, state_key) -> event_id, + which are the different state groups to resolve. + state_map_factory(func): will be called + with a list of event_ids that are needed, and should return with + a Deferred of dict of event_id to event. + + Returns + Deferred[dict[(str, str), synapse.events.FrozenEvent]]: + a map from (type, state_key) to event. + """ + if len(state_sets) == 1: + defer.returnValue(state_sets[0]) + + unconflicted_state, conflicted_state = _seperate( + state_sets, + ) + needed_events = set( event_id for event_ids in conflicted_state.itervalues() diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ad1d782705..c5292a5311 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,7 +27,7 @@ from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError -from synapse.state import resolve_events +from synapse.state import resolve_events_with_factory from synapse.util.caches.descriptors import cached from synapse.types import get_domain_from_id @@ -557,7 +557,7 @@ class EventsStore(SQLBaseStore): to_return.update(evs) defer.returnValue(to_return) - current_state = yield resolve_events( + current_state = yield resolve_events_with_factory( state_sets, state_map_factory=get_events, ) -- cgit 1.4.1 From 1224612a798ce9f14f0d44e1246f87da15a959f1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 17 Jan 2018 16:01:59 +0000 Subject: Log room when doing state resolution Mostly because it helps figure out what is prompting the resolution --- synapse/storage/events.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d08f7571d7..ba0da83642 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -528,6 +528,12 @@ class EventsStore(SQLBaseStore): # the events we have yet to persist, so we need a slightly more # complicated event lookup function than simply looking the events # up in the db. + + logger.info( + "Resolving state for %s with %i state sets", + room_id, len(state_sets), + ) + events_map = {ev.event_id: ev for ev, _ in events_context} @defer.inlineCallbacks -- cgit 1.4.1 From b387ee17b68e4398a8fa26fdf122b773a046e429 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 27 Jan 2018 14:00:11 +0000 Subject: Improve exception handling in persist_event 1. use `deferred.errback()` instead of `deferred.errback(e)`, which means that a Failure object will be constructed using the current exception state, *including* its stack trace - so the stack trace is saved in the Failure, leading to better exception reports. 2. Set `consumeErrors=True` on the ObservableDeferred, because we know that there will always be at least one observer - which avoids a spurious "CRITICAL: unhandled exception in Deferred" error in the logs --- synapse/storage/events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7a9cd3ec90..33fccfa7a8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -110,7 +110,7 @@ class _EventPeristenceQueue(object): end_item.events_and_contexts.extend(events_and_contexts) return end_item.deferred.observe() - deferred = ObservableDeferred(defer.Deferred()) + deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) queue.append(self._EventPersistQueueItem( events_and_contexts=events_and_contexts, @@ -152,8 +152,8 @@ class _EventPeristenceQueue(object): try: ret = yield per_item_callback(item) item.deferred.callback(ret) - except Exception as e: - item.deferred.errback(e) + except Exception: + item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: -- cgit 1.4.1