diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 101 |
1 files changed, 54 insertions, 47 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d2feee8dbb..57e5005285 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,8 +20,11 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function +from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError @@ -201,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -211,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -224,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -600,7 +605,8 @@ class EventsStore(SQLBaseStore): "rejections", "redactions", "room_memberships", - "state_events" + "state_events", + "topics" ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -1086,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1095,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e @@ -1131,54 +1137,55 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) + with Measure(self._clock, "_get_event_from_row"): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row_rejected_reason", + ) - original_ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) + original_ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) + redacted_event = None + if redacted: + redacted_event = prune_event(original_ev) - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": redacted_event.event_id}, + retcol="event_id", + desc="_get_event_from_row_redactions", + ) - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. + redacted_event.unsigned["redacted_by"] = redaction_id + # Get the redaction event. - because = yield self.get_event( - redaction_id, - check_redacted=False, - allow_none=True, - ) + because = yield self.get_event( + redaction_id, + check_redacted=False, + allow_none=True, + ) - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because + if because: + # It's fine to do add the event directly, since get_pdu_json + # will serialise this field correctly + redacted_event.unsigned["redacted_because"] = because - cache_entry = _EventCacheEntry( - event=original_ev, - redacted_event=redacted_event, - ) + cache_entry = _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, + ) - self._get_event_cache.prefill((original_ev.event_id,), cache_entry) + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) defer.returnValue(cache_entry) |