summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-05-13 16:59:41 +0100
committerErik Johnston <erik@matrix.org>2015-05-13 16:59:41 +0100
commit4071f2965320950c7f1bbdd39105f8c34ca95034 (patch)
tree2fc19964d13772cd142dcd86d8898fe8805a90c1 /synapse/storage
parentTypo (diff)
downloadsynapse-4071f2965320950c7f1bbdd39105f8c34ca95034.tar.xz
Fetch events from events_id in their own transactions
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py154
-rw-r--r--synapse/storage/state.py10
-rw-r--r--synapse/storage/stream.py22
3 files changed, 168 insertions, 18 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b21056f617..f6c1ec424a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -867,11 +867,26 @@ class SQLBaseStore(object):
 
         return self.runInteraction("_simple_max_id", func)
 
+    @defer.inlineCallbacks
     def _get_events(self, event_ids, check_redacted=True,
                     get_prev_content=False, desc="_get_events"):
-        return self.runInteraction(
-            desc, self._get_events_txn, event_ids,
-            check_redacted=check_redacted, get_prev_content=get_prev_content,
+        N = 50  # Only fetch 100 events at a time.
+
+        ds = [
+            self.runInteraction(
+                desc,
+                self._fetch_events_txn,
+                event_ids[i*N:(i+1)*N],
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content,
+            )
+            for i in range(1 + len(event_ids) / N)
+        ]
+
+        res = yield defer.gatherResults(ds, consumeErrors=True)
+
+        defer.returnValue(
+            list(itertools.chain(*res))
         )
 
     def _get_events_txn(self, txn, event_ids, check_redacted=True,
@@ -1007,6 +1022,139 @@ class SQLBaseStore(object):
             if e_id in event_map and event_map[e_id]
         ]
 
+    @defer.inlineCallbacks
+    def _fetch_events(self, events, check_redacted=True,
+                      get_prev_content=False, allow_rejected=False):
+        if not events:
+            defer.returnValue([])
+
+        event_map = {}
+
+        for event_id in events:
+            try:
+                ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content)
+
+                if allow_rejected or not ret.rejected_reason:
+                    event_map[event_id] = ret
+                else:
+                    event_map[event_id] = None
+            except KeyError:
+                pass
+
+        missing_events = [
+            e for e in events
+            if e not in event_map
+        ]
+
+        if missing_events:
+            sql = (
+                "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id "
+                " FROM event_json as e"
+                " LEFT JOIN rejections as rej USING (event_id)"
+                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+                " WHERE e.event_id IN (%s)"
+            ) % (",".join(["?"]*len(missing_events)),)
+
+            rows = yield self._execute(
+                "_fetch_events",
+                None,
+                sql,
+                *missing_events
+            )
+
+            res_ds = [
+                self._get_event_from_row(
+                    row[0], row[1], row[2],
+                    check_redacted=check_redacted,
+                    get_prev_content=get_prev_content,
+                    rejected_reason=row[3],
+                )
+                for row in rows
+            ]
+
+            res = yield defer.gatherResults(res_ds, consumeErrors=True)
+
+            event_map.update({
+                e.event_id: e
+                for e in res if e
+            })
+
+            for e in res:
+                self._get_event_cache.prefill(
+                    e.event_id, check_redacted, get_prev_content, e
+                )
+
+        defer.returnValue([
+            event_map[e_id] for e_id in events
+            if e_id in event_map and event_map[e_id]
+        ])
+
+    @defer.inlineCallbacks
+    def _get_event_from_row(self, internal_metadata, js, redacted,
+                            check_redacted=True, get_prev_content=False,
+                            rejected_reason=None):
+
+        start_time = time.time() * 1000
+
+        def update_counter(desc, last_time):
+            curr_time = self._get_event_counters.update(desc, last_time)
+            sql_getevents_timer.inc_by(curr_time - last_time, desc)
+            return curr_time
+
+        d = json.loads(js)
+        start_time = update_counter("decode_json", start_time)
+
+        internal_metadata = json.loads(internal_metadata)
+        start_time = update_counter("decode_internal", start_time)
+
+        if rejected_reason:
+            rejected_reason = yield self._simple_select_one_onecol(
+                desc="_get_event_from_row",
+                table="rejections",
+                keyvalues={"event_id": rejected_reason},
+                retcol="reason",
+            )
+
+        ev = FrozenEvent(
+            d,
+            internal_metadata_dict=internal_metadata,
+            rejected_reason=rejected_reason,
+        )
+        start_time = update_counter("build_frozen_event", start_time)
+
+        if check_redacted and redacted:
+            ev = prune_event(ev)
+
+            redaction_id = yield self._simple_select_one_onecol(
+                desc="_get_event_from_row",
+                table="redactions",
+                keyvalues={"redacts": ev.event_id},
+                retcol="event_id",
+            )
+
+            ev.unsigned["redacted_by"] = redaction_id
+            # Get the redaction event.
+
+            because = yield self.get_event_txn(
+                redaction_id,
+                check_redacted=False
+            )
+
+            if because:
+                ev.unsigned["redacted_because"] = because
+            start_time = update_counter("redact_event", start_time)
+
+        if get_prev_content and "replaces_state" in ev.unsigned:
+            prev = yield self.get_event(
+                ev.unsigned["replaces_state"],
+                get_prev_content=False,
+            )
+            if prev:
+                ev.unsigned["prev_content"] = prev.get_dict()["content"]
+            start_time = update_counter("get_prev_content", start_time)
+
+        defer.returnValue(ev)
+
     def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
                                 check_redacted=True, get_prev_content=False,
                                 rejected_reason=None):
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a80a947436..483b316e9f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -85,13 +85,13 @@ class StateStore(SQLBaseStore):
 
         @defer.inlineCallbacks
         def c(vals):
-            vals[:] = yield self.runInteraction(
-                "_get_state_groups_ev",
-                self._fetch_events_txn, vals
-            )
+            vals[:] = yield self._fetch_events(vals, get_prev_content=False)
 
         yield defer.gatherResults(
-            [c(vals) for vals in states.values()],
+            [
+                c(vals)
+                for vals in states.values()
+            ],
             consumeErrors=True,
         )
 
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 8045e17fd7..db9c2f0389 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -298,6 +298,7 @@ class StreamStore(SQLBaseStore):
 
         return self.runInteraction("paginate_room_events", f)
 
+    @defer.inlineCallbacks
     def get_recent_events_for_room(self, room_id, limit, end_token,
                                    with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
@@ -349,20 +350,21 @@ class StreamStore(SQLBaseStore):
             else:
                 token = (str(end_token), str(end_token))
 
-            events = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
+            return rows, token
 
-            self._set_before_and_after(events, rows)
-
-            return events, token
-
-        return self.runInteraction(
+        rows, token = yield self.runInteraction(
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
+        events = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
+
+        self._set_before_and_after(events, rows)
+
+        defer.returnValue((events, token))
+
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
         token = yield self._stream_id_gen.get_max_token(self)