diff options
author | Erik Johnston <erik@matrix.org> | 2015-05-13 16:59:41 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-05-13 16:59:41 +0100 |
commit | 4071f2965320950c7f1bbdd39105f8c34ca95034 (patch) | |
tree | 2fc19964d13772cd142dcd86d8898fe8805a90c1 /synapse/storage | |
parent | Typo (diff) | |
download | synapse-4071f2965320950c7f1bbdd39105f8c34ca95034.tar.xz |
Fetch events from events_id in their own transactions
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 154 | ||||
-rw-r--r-- | synapse/storage/state.py | 10 | ||||
-rw-r--r-- | synapse/storage/stream.py | 22 |
3 files changed, 168 insertions, 18 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b21056f617..f6c1ec424a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -867,11 +867,26 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) + @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, get_prev_content=False, desc="_get_events"): - return self.runInteraction( - desc, self._get_events_txn, event_ids, - check_redacted=check_redacted, get_prev_content=get_prev_content, + N = 50 # Only fetch 100 events at a time. + + ds = [ + self.runInteraction( + desc, + self._fetch_events_txn, + event_ids[i*N:(i+1)*N], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + for i in range(1 + len(event_ids) / N) + ] + + res = yield defer.gatherResults(ds, consumeErrors=True) + + defer.returnValue( + list(itertools.chain(*res)) ) def _get_events_txn(self, txn, event_ids, check_redacted=True, @@ -1007,6 +1022,139 @@ class SQLBaseStore(object): if e_id in event_map and event_map[e_id] ] + @defer.inlineCallbacks + def _fetch_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue([]) + + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None + except KeyError: + pass + + missing_events = [ + e for e in events + if e not in event_map + ] + + if missing_events: + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(missing_events)),) + + rows = yield self._execute( + "_fetch_events", + None, + sql, + *missing_events + ) + + res_ds = [ + self._get_event_from_row( + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ] + + res = yield defer.gatherResults(res_ds, consumeErrors=True) + + event_map.update({ + e.event_id: e + for e in res if e + }) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + defer.returnValue([ + event_map[e_id] for e_id in events + if e_id in event_map and event_map[e_id] + ]) + + @defer.inlineCallbacks + def _get_event_from_row(self, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + + start_time = time.time() * 1000 + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time + + d = json.loads(js) + start_time = update_counter("decode_json", start_time) + + internal_metadata = json.loads(internal_metadata) + start_time = update_counter("decode_internal", start_time) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + desc="_get_event_from_row", + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + start_time = update_counter("build_frozen_event", start_time) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield self._simple_select_one_onecol( + desc="_get_event_from_row", + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield self.get_event_txn( + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + start_time = update_counter("redact_event", start_time) + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield self.get_event( + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + start_time = update_counter("get_prev_content", start_time) + + defer.returnValue(ev) + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a80a947436..483b316e9f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,13 +85,13 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def c(vals): - vals[:] = yield self.runInteraction( - "_get_state_groups_ev", - self._fetch_events_txn, vals - ) + vals[:] = yield self._fetch_events(vals, get_prev_content=False) yield defer.gatherResults( - [c(vals) for vals in states.values()], + [ + c(vals) + for vals in states.values() + ], consumeErrors=True, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8045e17fd7..db9c2f0389 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -298,6 +298,7 @@ class StreamStore(SQLBaseStore): return self.runInteraction("paginate_room_events", f) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback @@ -349,20 +350,21 @@ class StreamStore(SQLBaseStore): else: token = (str(end_token), str(end_token)) - events = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows, token - self._set_before_and_after(events, rows) - - return events, token - - return self.runInteraction( + rows, token = yield self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token(self) |