From 409bcc76bdbdb5410d755b1eded370491641976f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:13:31 +0100 Subject: Load events for state group seperately --- synapse/storage/state.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dbc0e49c1f..9ed5412999 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -43,6 +43,7 @@ class StateStore(SQLBaseStore): * `state_groups_state`: Maps state group to state events. """ + @defer.inlineCallbacks def get_state_groups(self, event_ids): """ Get the state groups for the given list of event_ids @@ -71,17 +72,22 @@ class StateStore(SQLBaseStore): retcol="event_id", ) - state = self._get_events_txn(txn, state_ids) + # state = self._get_events_txn(txn, state_ids) - res[group] = state + res[group] = state_ids return res - return self.runInteraction( + states = yield self.runInteraction( "get_state_groups", f, ) + for vals in states.values(): + vals[:] = yield self._get_events(vals, desc="_get_state_groups_ev") + + defer.returnValue(states) + def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return -- cgit 1.4.1 From fec4485e28569718b9a0c341be4aaead8533c280 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:22:42 +0100 Subject: Batch fetching of events for state groups --- synapse/storage/state.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9ed5412999..c300c6e29c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -83,8 +83,31 @@ class StateStore(SQLBaseStore): f, ) + def fetch_events(txn, events): + sql = ( + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + " FROM event_json as e" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " LEFT JOIN rejections as rej on rej.event_id = e.event_id" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(events)),) + + txn.execute(sql, events) + rows = txn.fetchall() + + return [ + self._get_event_from_row_txn( + txn, row[0], row[1], row[2], + rejected_reason=row[3], + ) + for row in rows + ] + for vals in states.values(): - vals[:] = yield self._get_events(vals, desc="_get_state_groups_ev") + vals[:] = yield self.runInteraction( + "_get_state_groups_ev", + fetch_events, vals + ) defer.returnValue(states) -- cgit 1.4.1 From 619a21812be7872832865372587e98ed9e690184 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:29:03 +0100 Subject: defer.gatherResults loop --- synapse/storage/state.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c300c6e29c..2b5c2d999d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -103,12 +103,18 @@ class StateStore(SQLBaseStore): for row in rows ] - for vals in states.values(): + @defer.inlineCallbacks + def c(vals): vals[:] = yield self.runInteraction( "_get_state_groups_ev", fetch_events, vals ) + yield defer.gatherResults( + [c(vals) for vals in states.values()], + consumeErrors=True, + ) + defer.returnValue(states) def _store_state_groups_txn(self, txn, event, context): -- cgit 1.4.1 From 02590c3e1db79c2c8f158a73562d139ce411d5d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:31:28 +0100 Subject: Temp turn off checking for rejections and redactions --- synapse/storage/state.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b5c2d999d..6d7d576cb6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,10 +85,8 @@ class StateStore(SQLBaseStore): def fetch_events(txn, events): sql = ( - "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "SELECT e.internal_metadata, e.json " " FROM event_json as e" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " LEFT JOIN rejections as rej on rej.event_id = e.event_id" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(events)),) @@ -97,8 +95,7 @@ class StateStore(SQLBaseStore): return [ self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - rejected_reason=row[3], + txn, row[0], row[1], None ) for row in rows ] -- cgit 1.4.1 From 6edff11a888d2c3f7a6749599fcb9d4974a76bbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 14:39:05 +0100 Subject: Don't fetch redaction and rejection stuff for each event, so we can use index only scan --- synapse/storage/_base.py | 23 +++++++++++++++++++---- synapse/storage/state.py | 7 +++++-- 2 files changed, 24 insertions(+), 6 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0279400a83..a6fc4d6ea4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -918,10 +918,10 @@ class SQLBaseStore(object): start_time = update_counter("event_cache", start_time) sql = ( - "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " "FROM event_json as e " + "LEFT JOIN rejections as rej USING (event_id) " "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "LEFT JOIN rejections as rej on rej.event_id = e.event_id " "WHERE e.event_id = ? " "LIMIT 1 " ) @@ -967,6 +967,14 @@ class SQLBaseStore(object): internal_metadata = json.loads(internal_metadata) start_time = update_counter("decode_internal", start_time) + if rejected_reason: + rejected_reason = self._simple_select_one_onecol_txn( + txn, + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + ev = FrozenEvent( d, internal_metadata_dict=internal_metadata, @@ -977,12 +985,19 @@ class SQLBaseStore(object): if check_redacted and redacted: ev = prune_event(ev) - ev.unsigned["redacted_by"] = redacted + redaction_id = self._simple_select_one_onecol_txn( + txn, + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. because = self._get_event_txn( txn, - redacted, + redaction_id, check_redacted=False ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 6d7d576cb6..6d0ecf8dd9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,8 +85,10 @@ class StateStore(SQLBaseStore): def fetch_events(txn, events): sql = ( - "SELECT e.internal_metadata, e.json " + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(events)),) @@ -95,7 +97,8 @@ class StateStore(SQLBaseStore): return [ self._get_event_from_row_txn( - txn, row[0], row[1], None + txn, row[0], row[1], row[2], + rejected_reason=row[3], ) for row in rows ] -- cgit 1.4.1 From ca4f458787d4fcccf4d6b240f7497cac4e174bcc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 15:13:42 +0100 Subject: Fetch events in bulk --- synapse/storage/_base.py | 75 +++++++++++++++++++++++++++++++++++++++--------- synapse/storage/state.py | 22 +------------- 2 files changed, 63 insertions(+), 34 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a6fc4d6ea4..f7b4def9ec 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -875,19 +875,11 @@ class SQLBaseStore(object): def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False): - if not event_ids: - return [] - - events = [ - self._get_event_txn( - txn, event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content - ) - for event_id in event_ids - ] - - return [e for e in events if e] + return self._fetch_events_txn( + txn, event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): @@ -950,6 +942,63 @@ class SQLBaseStore(object): else: return None + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + return [] + + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + return None + except KeyError: + pass + + missing_events = [ + e for e in events + if e not in event_map + ] + + if missing_events: + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(missing_events)),) + + txn.execute(sql, missing_events) + rows = txn.fetchall() + + res = [ + self._get_event_from_row_txn( + txn, row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ] + + event_map.update({ + e.event_id: e + for e in res if e + }) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + return [event_map[e_id] for e_id in events if e_id in event_map] + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 6d0ecf8dd9..a80a947436 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -83,31 +83,11 @@ class StateStore(SQLBaseStore): f, ) - def fetch_events(txn, events): - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(events)),) - - txn.execute(sql, events) - rows = txn.fetchall() - - return [ - self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - rejected_reason=row[3], - ) - for row in rows - ] - @defer.inlineCallbacks def c(vals): vals[:] = yield self.runInteraction( "_get_state_groups_ev", - fetch_events, vals + self._fetch_events_txn, vals ) yield defer.gatherResults( -- cgit 1.4.1 From 4071f2965320950c7f1bbdd39105f8c34ca95034 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 16:59:41 +0100 Subject: Fetch events from events_id in their own transactions --- synapse/storage/_base.py | 154 +++++++++++++++++++++++++++++++++++++++++++++- synapse/storage/state.py | 10 +-- synapse/storage/stream.py | 22 ++++--- 3 files changed, 168 insertions(+), 18 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b21056f617..f6c1ec424a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -867,11 +867,26 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) + @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, get_prev_content=False, desc="_get_events"): - return self.runInteraction( - desc, self._get_events_txn, event_ids, - check_redacted=check_redacted, get_prev_content=get_prev_content, + N = 50 # Only fetch 100 events at a time. + + ds = [ + self.runInteraction( + desc, + self._fetch_events_txn, + event_ids[i*N:(i+1)*N], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + for i in range(1 + len(event_ids) / N) + ] + + res = yield defer.gatherResults(ds, consumeErrors=True) + + defer.returnValue( + list(itertools.chain(*res)) ) def _get_events_txn(self, txn, event_ids, check_redacted=True, @@ -1007,6 +1022,139 @@ class SQLBaseStore(object): if e_id in event_map and event_map[e_id] ] + @defer.inlineCallbacks + def _fetch_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue([]) + + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None + except KeyError: + pass + + missing_events = [ + e for e in events + if e not in event_map + ] + + if missing_events: + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(missing_events)),) + + rows = yield self._execute( + "_fetch_events", + None, + sql, + *missing_events + ) + + res_ds = [ + self._get_event_from_row( + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ] + + res = yield defer.gatherResults(res_ds, consumeErrors=True) + + event_map.update({ + e.event_id: e + for e in res if e + }) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + defer.returnValue([ + event_map[e_id] for e_id in events + if e_id in event_map and event_map[e_id] + ]) + + @defer.inlineCallbacks + def _get_event_from_row(self, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + + start_time = time.time() * 1000 + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time + + d = json.loads(js) + start_time = update_counter("decode_json", start_time) + + internal_metadata = json.loads(internal_metadata) + start_time = update_counter("decode_internal", start_time) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + desc="_get_event_from_row", + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + start_time = update_counter("build_frozen_event", start_time) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield self._simple_select_one_onecol( + desc="_get_event_from_row", + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield self.get_event_txn( + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + start_time = update_counter("redact_event", start_time) + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield self.get_event( + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + start_time = update_counter("get_prev_content", start_time) + + defer.returnValue(ev) + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a80a947436..483b316e9f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,13 +85,13 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def c(vals): - vals[:] = yield self.runInteraction( - "_get_state_groups_ev", - self._fetch_events_txn, vals - ) + vals[:] = yield self._fetch_events(vals, get_prev_content=False) yield defer.gatherResults( - [c(vals) for vals in states.values()], + [ + c(vals) + for vals in states.values() + ], consumeErrors=True, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8045e17fd7..db9c2f0389 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -298,6 +298,7 @@ class StreamStore(SQLBaseStore): return self.runInteraction("paginate_room_events", f) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback @@ -349,20 +350,21 @@ class StreamStore(SQLBaseStore): else: token = (str(end_token), str(end_token)) - events = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows, token - self._set_before_and_after(events, rows) - - return events, token - - return self.runInteraction( + rows, token = yield self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token(self) -- cgit 1.4.1 From cdb3757942fefdcdc3d33b9c6d7c9e44decefd6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:31:55 +0100 Subject: Refactor _get_events --- synapse/storage/_base.py | 346 +++++++++++++---------------------------------- synapse/storage/state.py | 2 +- synapse/util/__init__.py | 28 ++++ 3 files changed, 123 insertions(+), 253 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 46a1c07460..a6c6676f8d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging from synapse.api.errors import StoreError from synapse.events import FrozenEvent from synapse.events.utils import prune_event +from synapse.util import unwrap_deferred from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -28,7 +29,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import itertools import simplejson as json import sys import time @@ -870,35 +870,43 @@ class SQLBaseStore(object): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, desc="_get_events"): - N = 50 # Only fetch 100 events at a time. + get_prev_content=False, allow_rejected=False, txn=None): + if not event_ids: + defer.returnValue([]) - ds = [ - self._fetch_events( - event_ids[i*N:(i+1)*N], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - for i in range(1 + len(event_ids) / N) - ] + event_map = self._get_events_from_cache( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - res = yield defer.gatherResults(ds, consumeErrors=True) + missing_events = [e for e in event_ids if e not in event_map] - defer.returnValue( - list(itertools.chain(*res)) + missing_events = yield self._fetch_events( + txn, + missing_events, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, ) + event_map.update(missing_events) + + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False): - N = 50 # Only fetch 100 events at a time. - return list(itertools.chain(*[ - self._fetch_events_txn( - txn, event_ids[i*N:(i+1)*N], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - for i in range(1 + len(event_ids) / N) - ])) + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + txn=txn, + )) def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): @@ -909,68 +917,24 @@ class SQLBaseStore(object): def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - return ret - else: - return None - except KeyError: - pass - finally: - start_time = update_counter("event_cache", start_time) - - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - "FROM event_json as e " - "LEFT JOIN rejections as rej USING (event_id) " - "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "WHERE e.event_id = ? " - "LIMIT 1 " - ) - - txn.execute(sql, (event_id,)) - - res = txn.fetchone() - - if not res: - return None - - internal_metadata, js, redacted, rejected_reason = res - - start_time = update_counter("select_event", start_time) - - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, + events = self._get_events_txn( + txn, [event_id], check_redacted=check_redacted, get_prev_content=get_prev_content, - rejected_reason=rejected_reason, + allow_rejected=allow_rejected, ) - self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) - if allow_rejected or not rejected_reason: - return result - else: - return None - - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return [] + return events[0] if events else None + def _get_events_from_cache(self, events, check_redacted, get_prev_content, + allow_rejected): event_map = {} for event_id in events: try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + ret = self._get_event_cache.get( + event_id, check_redacted, get_prev_content + ) if allow_rejected or not ret.rejected_reason: event_map[event_id] = ret @@ -979,200 +943,81 @@ class SQLBaseStore(object): except KeyError: pass - missing_events = [ - e for e in events - if e not in event_map - ] - - if missing_events: - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(missing_events)),) - - txn.execute(sql, missing_events) - rows = txn.fetchall() - - res = [ - self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ] - - event_map.update({ - e.event_id: e - for e in res if e - }) - - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - - return [ - event_map[e_id] for e_id in events - if e_id in event_map and event_map[e_id] - ] + return event_map @defer.inlineCallbacks - def _fetch_events(self, events, check_redacted=True, + def _fetch_events(self, txn, events, check_redacted=True, get_prev_content=False, allow_rejected=False): if not events: - defer.returnValue([]) - - event_map = {} + defer.returnValue({}) - for event_id in events: - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass + rows = [] + N = 2 + for i in range(1 + len(events) / N): + evs = events[i*N:(i + 1)*N] + if not evs: + break - missing_events = [ - e for e in events - if e not in event_map - ] - - if missing_events: sql = ( "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(missing_events)),) - - rows = yield self._execute( - "_fetch_events", - None, - sql, - *missing_events - ) - - res_ds = [ - self._get_event_from_row( - row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ] + ) % (",".join(["?"]*len(evs)),) - res = yield defer.gatherResults(res_ds, consumeErrors=True) + if txn: + txn.execute(sql, evs) + rows.extend(txn.fetchall()) + else: + res = yield self._execute("_fetch_events", None, sql, *evs) + rows.extend(res) - event_map.update({ - e.event_id: e - for e in res if e - }) + res = [] + for row in rows: + e = yield self._get_event_from_row( + txn, + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + res.append(e) - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) - defer.returnValue([ - event_map[e_id] for e_id in events - if e_id in event_map and event_map[e_id] - ]) + defer.returnValue({ + e.event_id: e + for e in res if e + }) @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, + def _get_event_from_row(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - d = json.loads(js) - start_time = update_counter("decode_json", start_time) - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - desc="_get_event_from_row", - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - start_time = update_counter("build_frozen_event", start_time) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = yield self._simple_select_one_onecol( - desc="_get_event_from_row", - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - because = yield self.get_event_txn( - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield self.get_event( - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - - defer.returnValue(ev) - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - d = json.loads(js) - start_time = update_counter("decode_json", start_time) + def select(txn, *args, **kwargs): + if txn: + return self._simple_select_one_onecol_txn(txn, *args, **kwargs) + else: + return self._simple_select_one_onecol( + *args, + desc="_get_event_from_row", **kwargs + ) - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) + def get_event(txn, *args, **kwargs): + if txn: + return self._get_event_txn(txn, *args, **kwargs) + else: + return self.get_event(*args, **kwargs) if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( + rejected_reason = yield select( txn, table="rejections", keyvalues={"event_id": rejected_reason}, @@ -1184,12 +1029,11 @@ class SQLBaseStore(object): internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) - start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: ev = prune_event(ev) - redaction_id = self._simple_select_one_onecol_txn( + redaction_id = yield select( txn, table="redactions", keyvalues={"redacts": ev.event_id}, @@ -1199,7 +1043,7 @@ class SQLBaseStore(object): ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. - because = self._get_event_txn( + because = yield get_event( txn, redaction_id, check_redacted=False @@ -1207,19 +1051,17 @@ class SQLBaseStore(object): if because: ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( + prev = yield get_event( txn, ev.unsigned["replaces_state"], get_prev_content=False, ) if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - return ev + defer.returnValue(ev) def _parse_events(self, rows): return self.runInteraction( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 483b316e9f..26fd3b3e67 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,7 +85,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def c(vals): - vals[:] = yield self._fetch_events(vals, get_prev_content=False) + vals[:] = yield self._get_events(vals, get_prev_content=False) yield defer.gatherResults( [ diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c1a16b639a..b9afb3364d 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -29,6 +29,34 @@ def unwrapFirstError(failure): return failure.value.subFailure +def unwrap_deferred(d): + """Given a deferred that we know has completed, return its value or raise + the failure as an exception + """ + if not d.called: + raise RuntimeError("deferred has not finished") + + res = [] + + def f(r): + res.append(r) + return r + d.addCallback(f) + + if res: + return res[0] + + def f(r): + res.append(r) + return r + d.addErrback(f) + + if res: + res[0].raiseException() + else: + raise RuntimeError("deferred did not call callbacks") + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.4.1 From f6f902d459c0f888b70742b8f7cca640e544adf6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:45:48 +0100 Subject: Move fetching of events into their own transactions --- synapse/storage/event_federation.py | 38 ++++++++++++++++-------------------- synapse/storage/roommember.py | 39 +++++++++++++++++-------------------- synapse/storage/state.py | 2 -- synapse/storage/stream.py | 19 +++++++++--------- 4 files changed, 45 insertions(+), 53 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a1982dfbb5..5d4b7843f3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 @@ -33,16 +35,7 @@ class EventFederationStore(SQLBaseStore): """ def get_auth_chain(self, event_ids): - return self.runInteraction( - "get_auth_chain", - self._get_auth_chain_txn, - event_ids - ) - - def _get_auth_chain_txn(self, txn, event_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids) - - return self._get_events_txn(txn, results) + return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) def get_auth_chain_ids(self, event_ids): return self.runInteraction( @@ -369,7 +362,7 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit - ) + ).addCallback(self._get_events) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( @@ -415,16 +408,26 @@ class EventFederationStore(SQLBaseStore): front = new_front event_results += new_front - return self._get_events_txn(txn, event_results) + return event_results + @defer.inlineCallbacks def get_missing_events(self, room_id, earliest_events, latest_events, limit, min_depth): - return self.runInteraction( + ids = yield self.runInteraction( "get_missing_events", self._get_missing_events, room_id, earliest_events, latest_events, limit, min_depth ) + events = yield self._get_events(ids) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + defer.returnValue(events[:limit]) + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit, min_depth): @@ -456,14 +459,7 @@ class EventFederationStore(SQLBaseStore): front = new_front event_results |= new_front - events = self._get_events_txn(txn, event_results) - - events = sorted( - [ev for ev in events if ev.depth >= min_depth], - key=lambda e: e.depth, - ) - - return events[:limit] + return event_results def clean_room_for_join(self, room_id): return self.runInteraction( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 839c74f63a..80717f6cde 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -76,16 +76,16 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - def f(txn): - events = self._get_members_events_txn( - txn, - room_id, - user_id=user_id, - ) - - return events[0] if events else None - - return self.runInteraction("get_room_member", f) + return self.runInteraction( + "get_room_member", + self._get_members_events_txn, + room_id, + user_id=user_id, + ).addCallback( + self._get_events + ).addCallback( + lambda events: events[0] if events else None + ) def get_users_in_room(self, room_id): def f(txn): @@ -110,15 +110,12 @@ class RoomMemberStore(SQLBaseStore): Returns: list of namedtuples representing the members in this room. """ - - def f(txn): - return self._get_members_events_txn( - txn, - room_id, - membership=membership, - ) - - return self.runInteraction("get_room_members", f) + return self.runInteraction( + "get_room_members", + self._get_members_events_txn, + room_id, + membership=membership, + ).addCallback(self._get_events) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -190,14 +187,14 @@ class RoomMemberStore(SQLBaseStore): return self.runInteraction( "get_members_query", self._get_members_events_txn, where_clause, where_values - ) + ).addCallbacks(self._get_events) def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): rows = self._get_members_rows_txn( txn, room_id, membership, user_id, ) - return self._get_events_txn(txn, [r["event_id"] for r in rows]) + return [r["event_id"] for r in rows] def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): where_clause = "c.room_id = ?" diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 26fd3b3e67..3f5642642d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -72,8 +72,6 @@ class StateStore(SQLBaseStore): retcol="event_id", ) - # state = self._get_events_txn(txn, state_ids) - res[group] = state_ids return res diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index db9c2f0389..d16b57c515 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -224,7 +224,7 @@ class StreamStore(SQLBaseStore): return self.runInteraction("get_room_events_stream", f) - @log_function + @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1, with_feedback=False): @@ -286,17 +286,18 @@ class StreamStore(SQLBaseStore): # TODO (erikj): We should work out what to do here instead. next_token = to_key if to_key else from_key - events = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows, next_token, - self._set_before_and_after(events, rows) + rows, token = yield self.runInteraction("paginate_room_events", f) - return events, next_token, + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return self.runInteraction("paginate_room_events", f) + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, -- cgit 1.4.1 From acb12cc811d7ce7cb3c5b6544ed28f7d6592ef33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:20:05 +0100 Subject: Make store.get_current_state fetch events asyncly --- synapse/storage/events.py | 1 - synapse/storage/state.py | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3cf2f7cff8..066e1aab75 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -594,7 +594,6 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) - logger.debug("gatherResults after") defer.returnValue({ e.event_id: e diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f5642642d..b3f2a4dfa1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -159,11 +159,12 @@ class StateStore(SQLBaseStore): args = (room_id, ) txn.execute(sql, args) - results = self.cursor_to_dict(txn) + results = txn.fetchall() - return self._parse_events_txn(txn, results) + return [r[0] for r in results] - events = yield self.runInteraction("get_current_state", f) + event_ids = yield self.runInteraction("get_current_state", f) + events = yield self._get_events(event_ids) defer.returnValue(events) -- cgit 1.4.1 From 8763dd80efd19d562a97a8d5af59b85bc3678d46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:33:01 +0100 Subject: Don't fetch prev_content for current_state --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b3f2a4dfa1..56f0572f7e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -164,7 +164,7 @@ class StateStore(SQLBaseStore): return [r[0] for r in results] event_ids = yield self.runInteraction("get_current_state", f) - events = yield self._get_events(event_ids) + events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) -- cgit 1.4.1