summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py101
1 files changed, 54 insertions, 47 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d2feee8dbb..57e5005285 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -20,8 +20,11 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
 from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
+from synapse.util.logcontext import (
+    preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+)
 from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 
@@ -201,7 +204,7 @@ class EventsStore(SQLBaseStore):
 
         deferreds = []
         for room_id, evs_ctxs in partitioned.items():
-            d = self._event_persist_queue.add_to_queue(
+            d = preserve_fn(self._event_persist_queue.add_to_queue)(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
                 current_state=None,
@@ -211,7 +214,9 @@ class EventsStore(SQLBaseStore):
         for room_id in partitioned.keys():
             self._maybe_start_persisting(room_id)
 
-        return defer.gatherResults(deferreds, consumeErrors=True)
+        return preserve_context_over_deferred(
+            defer.gatherResults(deferreds, consumeErrors=True)
+        )
 
     @defer.inlineCallbacks
     @log_function
@@ -224,7 +229,7 @@ class EventsStore(SQLBaseStore):
 
         self._maybe_start_persisting(event.room_id)
 
-        yield deferred
+        yield preserve_context_over_deferred(deferred)
 
         max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -600,7 +605,8 @@ class EventsStore(SQLBaseStore):
                 "rejections",
                 "redactions",
                 "room_memberships",
-                "state_events"
+                "state_events",
+                "topics"
             ):
                 txn.executemany(
                     "DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -1086,7 +1092,7 @@ class EventsStore(SQLBaseStore):
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
-        res = yield defer.gatherResults(
+        res = yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
@@ -1095,7 +1101,7 @@ class EventsStore(SQLBaseStore):
                 for row in rows
             ],
             consumeErrors=True
-        )
+        ))
 
         defer.returnValue({
             e.event.event_id: e
@@ -1131,54 +1137,55 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _get_event_from_row(self, internal_metadata, js, redacted,
                             rejected_reason=None):
-        d = json.loads(js)
-        internal_metadata = json.loads(internal_metadata)
-
-        if rejected_reason:
-            rejected_reason = yield self._simple_select_one_onecol(
-                table="rejections",
-                keyvalues={"event_id": rejected_reason},
-                retcol="reason",
-                desc="_get_event_from_row_rejected_reason",
-            )
+        with Measure(self._clock, "_get_event_from_row"):
+            d = json.loads(js)
+            internal_metadata = json.loads(internal_metadata)
+
+            if rejected_reason:
+                rejected_reason = yield self._simple_select_one_onecol(
+                    table="rejections",
+                    keyvalues={"event_id": rejected_reason},
+                    retcol="reason",
+                    desc="_get_event_from_row_rejected_reason",
+                )
 
-        original_ev = FrozenEvent(
-            d,
-            internal_metadata_dict=internal_metadata,
-            rejected_reason=rejected_reason,
-        )
+            original_ev = FrozenEvent(
+                d,
+                internal_metadata_dict=internal_metadata,
+                rejected_reason=rejected_reason,
+            )
 
-        redacted_event = None
-        if redacted:
-            redacted_event = prune_event(original_ev)
+            redacted_event = None
+            if redacted:
+                redacted_event = prune_event(original_ev)
 
-            redaction_id = yield self._simple_select_one_onecol(
-                table="redactions",
-                keyvalues={"redacts": redacted_event.event_id},
-                retcol="event_id",
-                desc="_get_event_from_row_redactions",
-            )
+                redaction_id = yield self._simple_select_one_onecol(
+                    table="redactions",
+                    keyvalues={"redacts": redacted_event.event_id},
+                    retcol="event_id",
+                    desc="_get_event_from_row_redactions",
+                )
 
-            redacted_event.unsigned["redacted_by"] = redaction_id
-            # Get the redaction event.
+                redacted_event.unsigned["redacted_by"] = redaction_id
+                # Get the redaction event.
 
-            because = yield self.get_event(
-                redaction_id,
-                check_redacted=False,
-                allow_none=True,
-            )
+                because = yield self.get_event(
+                    redaction_id,
+                    check_redacted=False,
+                    allow_none=True,
+                )
 
-            if because:
-                # It's fine to do add the event directly, since get_pdu_json
-                # will serialise this field correctly
-                redacted_event.unsigned["redacted_because"] = because
+                if because:
+                    # It's fine to do add the event directly, since get_pdu_json
+                    # will serialise this field correctly
+                    redacted_event.unsigned["redacted_because"] = because
 
-        cache_entry = _EventCacheEntry(
-            event=original_ev,
-            redacted_event=redacted_event,
-        )
+            cache_entry = _EventCacheEntry(
+                event=original_ev,
+                redacted_event=redacted_event,
+            )
 
-        self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
+            self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
         defer.returnValue(cache_entry)