From 409bcc76bdbdb5410d755b1eded370491641976f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:13:31 +0100 Subject: Load events for state group seperately --- synapse/storage/_base.py | 4 ++-- synapse/storage/state.py | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c9fe5a3555..0279400a83 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -867,9 +867,9 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False): + get_prev_content=False, desc="_get_events"): return self.runInteraction( - "_get_events", self._get_events_txn, event_ids, + desc, self._get_events_txn, event_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dbc0e49c1f..9ed5412999 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -43,6 +43,7 @@ class StateStore(SQLBaseStore): * `state_groups_state`: Maps state group to state events. """ + @defer.inlineCallbacks def get_state_groups(self, event_ids): """ Get the state groups for the given list of event_ids @@ -71,17 +72,22 @@ class StateStore(SQLBaseStore): retcol="event_id", ) - state = self._get_events_txn(txn, state_ids) + # state = self._get_events_txn(txn, state_ids) - res[group] = state + res[group] = state_ids return res - return self.runInteraction( + states = yield self.runInteraction( "get_state_groups", f, ) + for vals in states.values(): + vals[:] = yield self._get_events(vals, desc="_get_state_groups_ev") + + defer.returnValue(states) + def _store_state_groups_txn(self, txn, event, context): if context.current_state is None: return -- cgit 1.4.1 From fec4485e28569718b9a0c341be4aaead8533c280 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:22:42 +0100 Subject: Batch fetching of events for state groups --- synapse/storage/state.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9ed5412999..c300c6e29c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -83,8 +83,31 @@ class StateStore(SQLBaseStore): f, ) + def fetch_events(txn, events): + sql = ( + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + " FROM event_json as e" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " LEFT JOIN rejections as rej on rej.event_id = e.event_id" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(events)),) + + txn.execute(sql, events) + rows = txn.fetchall() + + return [ + self._get_event_from_row_txn( + txn, row[0], row[1], row[2], + rejected_reason=row[3], + ) + for row in rows + ] + for vals in states.values(): - vals[:] = yield self._get_events(vals, desc="_get_state_groups_ev") + vals[:] = yield self.runInteraction( + "_get_state_groups_ev", + fetch_events, vals + ) defer.returnValue(states) -- cgit 1.4.1 From 619a21812be7872832865372587e98ed9e690184 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:29:03 +0100 Subject: defer.gatherResults loop --- synapse/storage/state.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c300c6e29c..2b5c2d999d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -103,12 +103,18 @@ class StateStore(SQLBaseStore): for row in rows ] - for vals in states.values(): + @defer.inlineCallbacks + def c(vals): vals[:] = yield self.runInteraction( "_get_state_groups_ev", fetch_events, vals ) + yield defer.gatherResults( + [c(vals) for vals in states.values()], + consumeErrors=True, + ) + defer.returnValue(states) def _store_state_groups_txn(self, txn, event, context): -- cgit 1.4.1 From 02590c3e1db79c2c8f158a73562d139ce411d5d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 11:31:28 +0100 Subject: Temp turn off checking for rejections and redactions --- synapse/storage/state.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b5c2d999d..6d7d576cb6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,10 +85,8 @@ class StateStore(SQLBaseStore): def fetch_events(txn, events): sql = ( - "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "SELECT e.internal_metadata, e.json " " FROM event_json as e" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " LEFT JOIN rejections as rej on rej.event_id = e.event_id" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(events)),) @@ -97,8 +95,7 @@ class StateStore(SQLBaseStore): return [ self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - rejected_reason=row[3], + txn, row[0], row[1], None ) for row in rows ] -- cgit 1.4.1 From 6edff11a888d2c3f7a6749599fcb9d4974a76bbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 14:39:05 +0100 Subject: Don't fetch redaction and rejection stuff for each event, so we can use index only scan --- synapse/storage/_base.py | 23 +++++++++++++++++++---- synapse/storage/state.py | 7 +++++-- 2 files changed, 24 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0279400a83..a6fc4d6ea4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -918,10 +918,10 @@ class SQLBaseStore(object): start_time = update_counter("event_cache", start_time) sql = ( - "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " "FROM event_json as e " + "LEFT JOIN rejections as rej USING (event_id) " "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "LEFT JOIN rejections as rej on rej.event_id = e.event_id " "WHERE e.event_id = ? " "LIMIT 1 " ) @@ -967,6 +967,14 @@ class SQLBaseStore(object): internal_metadata = json.loads(internal_metadata) start_time = update_counter("decode_internal", start_time) + if rejected_reason: + rejected_reason = self._simple_select_one_onecol_txn( + txn, + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + ev = FrozenEvent( d, internal_metadata_dict=internal_metadata, @@ -977,12 +985,19 @@ class SQLBaseStore(object): if check_redacted and redacted: ev = prune_event(ev) - ev.unsigned["redacted_by"] = redacted + redaction_id = self._simple_select_one_onecol_txn( + txn, + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. because = self._get_event_txn( txn, - redacted, + redaction_id, check_redacted=False ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 6d7d576cb6..6d0ecf8dd9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,8 +85,10 @@ class StateStore(SQLBaseStore): def fetch_events(txn, events): sql = ( - "SELECT e.internal_metadata, e.json " + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(events)),) @@ -95,7 +97,8 @@ class StateStore(SQLBaseStore): return [ self._get_event_from_row_txn( - txn, row[0], row[1], None + txn, row[0], row[1], row[2], + rejected_reason=row[3], ) for row in rows ] -- cgit 1.4.1 From ca4f458787d4fcccf4d6b240f7497cac4e174bcc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 15:13:42 +0100 Subject: Fetch events in bulk --- synapse/storage/_base.py | 75 +++++++++++++++++++++++++++++++++++++++--------- synapse/storage/state.py | 22 +------------- 2 files changed, 63 insertions(+), 34 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a6fc4d6ea4..f7b4def9ec 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -875,19 +875,11 @@ class SQLBaseStore(object): def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False): - if not event_ids: - return [] - - events = [ - self._get_event_txn( - txn, event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content - ) - for event_id in event_ids - ] - - return [e for e in events if e] + return self._fetch_events_txn( + txn, event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): @@ -950,6 +942,63 @@ class SQLBaseStore(object): else: return None + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + return [] + + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + return None + except KeyError: + pass + + missing_events = [ + e for e in events + if e not in event_map + ] + + if missing_events: + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(missing_events)),) + + txn.execute(sql, missing_events) + rows = txn.fetchall() + + res = [ + self._get_event_from_row_txn( + txn, row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ] + + event_map.update({ + e.event_id: e + for e in res if e + }) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + return [event_map[e_id] for e_id in events if e_id in event_map] + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 6d0ecf8dd9..a80a947436 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -83,31 +83,11 @@ class StateStore(SQLBaseStore): f, ) - def fetch_events(txn, events): - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(events)),) - - txn.execute(sql, events) - rows = txn.fetchall() - - return [ - self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - rejected_reason=row[3], - ) - for row in rows - ] - @defer.inlineCallbacks def c(vals): vals[:] = yield self.runInteraction( "_get_state_groups_ev", - fetch_events, vals + self._fetch_events_txn, vals ) yield defer.gatherResults( -- cgit 1.4.1 From 5971d240d458dbea23abed803aa3d7bf31c2efce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 15:26:49 +0100 Subject: Limit batch size --- synapse/storage/_base.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f7b4def9ec..f169884d83 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -28,6 +28,7 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools +import itertools import simplejson as json import sys import time @@ -875,11 +876,15 @@ class SQLBaseStore(object): def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False): - return self._fetch_events_txn( - txn, event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) + N = 50 # Only fetch 100 events at a time. + return list(itertools.chain(*[ + self._fetch_events_txn( + txn, event_ids[i*N:(i+1)*N], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + for i in range(1 + len(event_ids) / N) + ])) def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): -- cgit 1.4.1 From cf706cc6ef1ef864400fc265b1c7014e2feeabb0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 15:31:25 +0100 Subject: Don't return None --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f169884d83..f990b8149e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -961,7 +961,7 @@ class SQLBaseStore(object): if allow_rejected or not ret.rejected_reason: event_map[event_id] = ret else: - return None + event_map[event_id] = None except KeyError: pass -- cgit 1.4.1 From 8888982db3dbf839003fb544072348f73609724f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 15:43:32 +0100 Subject: Don't insert None --- synapse/storage/_base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f990b8149e..bb80108a30 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1002,7 +1002,10 @@ class SQLBaseStore(object): e.event_id, check_redacted, get_prev_content, e ) - return [event_map[e_id] for e_id in events if e_id in event_map] + return [ + event_map[e_id] for e_id in events + if e_id in event_map and event_id[e_id] + ] def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, -- cgit 1.4.1 From a988361aea45e77ed94952e16a2c96fd0cb1d362 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 15:44:15 +0100 Subject: Typo --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index bb80108a30..b21056f617 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -1004,7 +1004,7 @@ class SQLBaseStore(object): return [ event_map[e_id] for e_id in events - if e_id in event_map and event_id[e_id] + if e_id in event_map and event_map[e_id] ] def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, -- cgit 1.4.1 From 4071f2965320950c7f1bbdd39105f8c34ca95034 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 16:59:41 +0100 Subject: Fetch events from events_id in their own transactions --- synapse/storage/_base.py | 154 +++++++++++++++++++++++++++++++++++++++++++++- synapse/storage/state.py | 10 +-- synapse/storage/stream.py | 22 ++++--- 3 files changed, 168 insertions(+), 18 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b21056f617..f6c1ec424a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -867,11 +867,26 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) + @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, get_prev_content=False, desc="_get_events"): - return self.runInteraction( - desc, self._get_events_txn, event_ids, - check_redacted=check_redacted, get_prev_content=get_prev_content, + N = 50 # Only fetch 100 events at a time. + + ds = [ + self.runInteraction( + desc, + self._fetch_events_txn, + event_ids[i*N:(i+1)*N], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + for i in range(1 + len(event_ids) / N) + ] + + res = yield defer.gatherResults(ds, consumeErrors=True) + + defer.returnValue( + list(itertools.chain(*res)) ) def _get_events_txn(self, txn, event_ids, check_redacted=True, @@ -1007,6 +1022,139 @@ class SQLBaseStore(object): if e_id in event_map and event_map[e_id] ] + @defer.inlineCallbacks + def _fetch_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue([]) + + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None + except KeyError: + pass + + missing_events = [ + e for e in events + if e not in event_map + ] + + if missing_events: + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(missing_events)),) + + rows = yield self._execute( + "_fetch_events", + None, + sql, + *missing_events + ) + + res_ds = [ + self._get_event_from_row( + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ] + + res = yield defer.gatherResults(res_ds, consumeErrors=True) + + event_map.update({ + e.event_id: e + for e in res if e + }) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + defer.returnValue([ + event_map[e_id] for e_id in events + if e_id in event_map and event_map[e_id] + ]) + + @defer.inlineCallbacks + def _get_event_from_row(self, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + + start_time = time.time() * 1000 + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time + + d = json.loads(js) + start_time = update_counter("decode_json", start_time) + + internal_metadata = json.loads(internal_metadata) + start_time = update_counter("decode_internal", start_time) + + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + desc="_get_event_from_row", + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + start_time = update_counter("build_frozen_event", start_time) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield self._simple_select_one_onecol( + desc="_get_event_from_row", + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield self.get_event_txn( + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + start_time = update_counter("redact_event", start_time) + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield self.get_event( + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + start_time = update_counter("get_prev_content", start_time) + + defer.returnValue(ev) + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a80a947436..483b316e9f 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,13 +85,13 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def c(vals): - vals[:] = yield self.runInteraction( - "_get_state_groups_ev", - self._fetch_events_txn, vals - ) + vals[:] = yield self._fetch_events(vals, get_prev_content=False) yield defer.gatherResults( - [c(vals) for vals in states.values()], + [ + c(vals) + for vals in states.values() + ], consumeErrors=True, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8045e17fd7..db9c2f0389 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -298,6 +298,7 @@ class StreamStore(SQLBaseStore): return self.runInteraction("paginate_room_events", f) + @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, with_feedback=False, from_token=None): # TODO (erikj): Handle compressed feedback @@ -349,20 +350,21 @@ class StreamStore(SQLBaseStore): else: token = (str(end_token), str(end_token)) - events = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows, token - self._set_before_and_after(events, rows) - - return events, token - - return self.runInteraction( + rows, token = yield self.runInteraction( "get_recent_events_for_room", get_recent_events_for_room_txn ) + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) + + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) + @defer.inlineCallbacks def get_room_events_max_id(self, direction='f'): token = yield self._stream_id_gen.get_max_token(self) -- cgit 1.4.1 From 968b01a91a759e4137dbbd5e2537abefe3b1ccbe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 17:02:46 +0100 Subject: Actually use async method --- synapse/storage/_base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f6c1ec424a..0ba12d48a5 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -873,9 +873,7 @@ class SQLBaseStore(object): N = 50 # Only fetch 100 events at a time. ds = [ - self.runInteraction( - desc, - self._fetch_events_txn, + self._fetch_events( event_ids[i*N:(i+1)*N], check_redacted=check_redacted, get_prev_content=get_prev_content, -- cgit 1.4.1 From 4f1d984e56e795a35a9bdb691e58f2b6f90e25f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 May 2015 17:22:26 +0100 Subject: Add index on events --- synapse/storage/__init__.py | 2 +- synapse/storage/schema/delta/19/event_index.sql | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/19/event_index.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7cb91a0be9..75af44d787 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -51,7 +51,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 18 +SCHEMA_VERSION = 19 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/19/event_index.sql b/synapse/storage/schema/delta/19/event_index.sql new file mode 100644 index 0000000000..f3792817bb --- /dev/null +++ b/synapse/storage/schema/delta/19/event_index.sql @@ -0,0 +1,19 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE INDEX events_order_topo_stream_room ON events( + topological_ordering, stream_ordering, room_id +); \ No newline at end of file -- cgit 1.4.1 From cdb3757942fefdcdc3d33b9c6d7c9e44decefd6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:31:55 +0100 Subject: Refactor _get_events --- synapse/storage/_base.py | 346 +++++++++++++---------------------------------- synapse/storage/state.py | 2 +- synapse/util/__init__.py | 28 ++++ 3 files changed, 123 insertions(+), 253 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 46a1c07460..a6c6676f8d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging from synapse.api.errors import StoreError from synapse.events import FrozenEvent from synapse.events.utils import prune_event +from synapse.util import unwrap_deferred from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -28,7 +29,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import itertools import simplejson as json import sys import time @@ -870,35 +870,43 @@ class SQLBaseStore(object): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, desc="_get_events"): - N = 50 # Only fetch 100 events at a time. + get_prev_content=False, allow_rejected=False, txn=None): + if not event_ids: + defer.returnValue([]) - ds = [ - self._fetch_events( - event_ids[i*N:(i+1)*N], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - for i in range(1 + len(event_ids) / N) - ] + event_map = self._get_events_from_cache( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - res = yield defer.gatherResults(ds, consumeErrors=True) + missing_events = [e for e in event_ids if e not in event_map] - defer.returnValue( - list(itertools.chain(*res)) + missing_events = yield self._fetch_events( + txn, + missing_events, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, ) + event_map.update(missing_events) + + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False): - N = 50 # Only fetch 100 events at a time. - return list(itertools.chain(*[ - self._fetch_events_txn( - txn, event_ids[i*N:(i+1)*N], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - for i in range(1 + len(event_ids) / N) - ])) + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + txn=txn, + )) def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): @@ -909,68 +917,24 @@ class SQLBaseStore(object): def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - return ret - else: - return None - except KeyError: - pass - finally: - start_time = update_counter("event_cache", start_time) - - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - "FROM event_json as e " - "LEFT JOIN rejections as rej USING (event_id) " - "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "WHERE e.event_id = ? " - "LIMIT 1 " - ) - - txn.execute(sql, (event_id,)) - - res = txn.fetchone() - - if not res: - return None - - internal_metadata, js, redacted, rejected_reason = res - - start_time = update_counter("select_event", start_time) - - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, + events = self._get_events_txn( + txn, [event_id], check_redacted=check_redacted, get_prev_content=get_prev_content, - rejected_reason=rejected_reason, + allow_rejected=allow_rejected, ) - self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) - if allow_rejected or not rejected_reason: - return result - else: - return None - - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return [] + return events[0] if events else None + def _get_events_from_cache(self, events, check_redacted, get_prev_content, + allow_rejected): event_map = {} for event_id in events: try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + ret = self._get_event_cache.get( + event_id, check_redacted, get_prev_content + ) if allow_rejected or not ret.rejected_reason: event_map[event_id] = ret @@ -979,200 +943,81 @@ class SQLBaseStore(object): except KeyError: pass - missing_events = [ - e for e in events - if e not in event_map - ] - - if missing_events: - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(missing_events)),) - - txn.execute(sql, missing_events) - rows = txn.fetchall() - - res = [ - self._get_event_from_row_txn( - txn, row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ] - - event_map.update({ - e.event_id: e - for e in res if e - }) - - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - - return [ - event_map[e_id] for e_id in events - if e_id in event_map and event_map[e_id] - ] + return event_map @defer.inlineCallbacks - def _fetch_events(self, events, check_redacted=True, + def _fetch_events(self, txn, events, check_redacted=True, get_prev_content=False, allow_rejected=False): if not events: - defer.returnValue([]) - - event_map = {} + defer.returnValue({}) - for event_id in events: - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass + rows = [] + N = 2 + for i in range(1 + len(events) / N): + evs = events[i*N:(i + 1)*N] + if not evs: + break - missing_events = [ - e for e in events - if e not in event_map - ] - - if missing_events: sql = ( "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(missing_events)),) - - rows = yield self._execute( - "_fetch_events", - None, - sql, - *missing_events - ) - - res_ds = [ - self._get_event_from_row( - row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ] + ) % (",".join(["?"]*len(evs)),) - res = yield defer.gatherResults(res_ds, consumeErrors=True) + if txn: + txn.execute(sql, evs) + rows.extend(txn.fetchall()) + else: + res = yield self._execute("_fetch_events", None, sql, *evs) + rows.extend(res) - event_map.update({ - e.event_id: e - for e in res if e - }) + res = [] + for row in rows: + e = yield self._get_event_from_row( + txn, + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + res.append(e) - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) - defer.returnValue([ - event_map[e_id] for e_id in events - if e_id in event_map and event_map[e_id] - ]) + defer.returnValue({ + e.event_id: e + for e in res if e + }) @defer.inlineCallbacks - def _get_event_from_row(self, internal_metadata, js, redacted, + def _get_event_from_row(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - d = json.loads(js) - start_time = update_counter("decode_json", start_time) - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) - - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - desc="_get_event_from_row", - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - start_time = update_counter("build_frozen_event", start_time) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = yield self._simple_select_one_onecol( - desc="_get_event_from_row", - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - because = yield self.get_event_txn( - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield self.get_event( - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - - defer.returnValue(ev) - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - d = json.loads(js) - start_time = update_counter("decode_json", start_time) + def select(txn, *args, **kwargs): + if txn: + return self._simple_select_one_onecol_txn(txn, *args, **kwargs) + else: + return self._simple_select_one_onecol( + *args, + desc="_get_event_from_row", **kwargs + ) - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) + def get_event(txn, *args, **kwargs): + if txn: + return self._get_event_txn(txn, *args, **kwargs) + else: + return self.get_event(*args, **kwargs) if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( + rejected_reason = yield select( txn, table="rejections", keyvalues={"event_id": rejected_reason}, @@ -1184,12 +1029,11 @@ class SQLBaseStore(object): internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) - start_time = update_counter("build_frozen_event", start_time) if check_redacted and redacted: ev = prune_event(ev) - redaction_id = self._simple_select_one_onecol_txn( + redaction_id = yield select( txn, table="redactions", keyvalues={"redacts": ev.event_id}, @@ -1199,7 +1043,7 @@ class SQLBaseStore(object): ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. - because = self._get_event_txn( + because = yield get_event( txn, redaction_id, check_redacted=False @@ -1207,19 +1051,17 @@ class SQLBaseStore(object): if because: ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( + prev = yield get_event( txn, ev.unsigned["replaces_state"], get_prev_content=False, ) if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - return ev + defer.returnValue(ev) def _parse_events(self, rows): return self.runInteraction( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 483b316e9f..26fd3b3e67 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -85,7 +85,7 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def c(vals): - vals[:] = yield self._fetch_events(vals, get_prev_content=False) + vals[:] = yield self._get_events(vals, get_prev_content=False) yield defer.gatherResults( [ diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c1a16b639a..b9afb3364d 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -29,6 +29,34 @@ def unwrapFirstError(failure): return failure.value.subFailure +def unwrap_deferred(d): + """Given a deferred that we know has completed, return its value or raise + the failure as an exception + """ + if not d.called: + raise RuntimeError("deferred has not finished") + + res = [] + + def f(r): + res.append(r) + return r + d.addCallback(f) + + if res: + return res[0] + + def f(r): + res.append(r) + return r + d.addErrback(f) + + if res: + res[0].raiseException() + else: + raise RuntimeError("deferred did not call callbacks") + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.4.1 From f6f902d459c0f888b70742b8f7cca640e544adf6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:45:48 +0100 Subject: Move fetching of events into their own transactions --- synapse/storage/event_federation.py | 38 ++++++++++++++++-------------------- synapse/storage/roommember.py | 39 +++++++++++++++++-------------------- synapse/storage/state.py | 2 -- synapse/storage/stream.py | 19 +++++++++--------- 4 files changed, 45 insertions(+), 53 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index a1982dfbb5..5d4b7843f3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 @@ -33,16 +35,7 @@ class EventFederationStore(SQLBaseStore): """ def get_auth_chain(self, event_ids): - return self.runInteraction( - "get_auth_chain", - self._get_auth_chain_txn, - event_ids - ) - - def _get_auth_chain_txn(self, txn, event_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids) - - return self._get_events_txn(txn, results) + return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) def get_auth_chain_ids(self, event_ids): return self.runInteraction( @@ -369,7 +362,7 @@ class EventFederationStore(SQLBaseStore): return self.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, event_list, limit - ) + ).addCallback(self._get_events) def _get_backfill_events(self, txn, room_id, event_list, limit): logger.debug( @@ -415,16 +408,26 @@ class EventFederationStore(SQLBaseStore): front = new_front event_results += new_front - return self._get_events_txn(txn, event_results) + return event_results + @defer.inlineCallbacks def get_missing_events(self, room_id, earliest_events, latest_events, limit, min_depth): - return self.runInteraction( + ids = yield self.runInteraction( "get_missing_events", self._get_missing_events, room_id, earliest_events, latest_events, limit, min_depth ) + events = yield self._get_events(ids) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + defer.returnValue(events[:limit]) + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit, min_depth): @@ -456,14 +459,7 @@ class EventFederationStore(SQLBaseStore): front = new_front event_results |= new_front - events = self._get_events_txn(txn, event_results) - - events = sorted( - [ev for ev in events if ev.depth >= min_depth], - key=lambda e: e.depth, - ) - - return events[:limit] + return event_results def clean_room_for_join(self, room_id): return self.runInteraction( diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 839c74f63a..80717f6cde 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -76,16 +76,16 @@ class RoomMemberStore(SQLBaseStore): Returns: Deferred: Results in a MembershipEvent or None. """ - def f(txn): - events = self._get_members_events_txn( - txn, - room_id, - user_id=user_id, - ) - - return events[0] if events else None - - return self.runInteraction("get_room_member", f) + return self.runInteraction( + "get_room_member", + self._get_members_events_txn, + room_id, + user_id=user_id, + ).addCallback( + self._get_events + ).addCallback( + lambda events: events[0] if events else None + ) def get_users_in_room(self, room_id): def f(txn): @@ -110,15 +110,12 @@ class RoomMemberStore(SQLBaseStore): Returns: list of namedtuples representing the members in this room. """ - - def f(txn): - return self._get_members_events_txn( - txn, - room_id, - membership=membership, - ) - - return self.runInteraction("get_room_members", f) + return self.runInteraction( + "get_room_members", + self._get_members_events_txn, + room_id, + membership=membership, + ).addCallback(self._get_events) def get_rooms_for_user_where_membership_is(self, user_id, membership_list): """ Get all the rooms for this user where the membership for this user @@ -190,14 +187,14 @@ class RoomMemberStore(SQLBaseStore): return self.runInteraction( "get_members_query", self._get_members_events_txn, where_clause, where_values - ) + ).addCallbacks(self._get_events) def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): rows = self._get_members_rows_txn( txn, room_id, membership, user_id, ) - return self._get_events_txn(txn, [r["event_id"] for r in rows]) + return [r["event_id"] for r in rows] def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): where_clause = "c.room_id = ?" diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 26fd3b3e67..3f5642642d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -72,8 +72,6 @@ class StateStore(SQLBaseStore): retcol="event_id", ) - # state = self._get_events_txn(txn, state_ids) - res[group] = state_ids return res diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index db9c2f0389..d16b57c515 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -224,7 +224,7 @@ class StreamStore(SQLBaseStore): return self.runInteraction("get_room_events_stream", f) - @log_function + @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1, with_feedback=False): @@ -286,17 +286,18 @@ class StreamStore(SQLBaseStore): # TODO (erikj): We should work out what to do here instead. next_token = to_key if to_key else from_key - events = self._get_events_txn( - txn, - [r["event_id"] for r in rows], - get_prev_content=True - ) + return rows, next_token, - self._set_before_and_after(events, rows) + rows, token = yield self.runInteraction("paginate_room_events", f) - return events, next_token, + events = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return self.runInteraction("paginate_room_events", f) + self._set_before_and_after(events, rows) + + defer.returnValue((events, token)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, -- cgit 1.4.1 From ab78a8926e508a56ea8a4719f80e6402493ff9e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:47:16 +0100 Subject: Err, we probably want a bigger limit --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a6c6676f8d..5c33bd81a8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -952,7 +952,7 @@ class SQLBaseStore(object): defer.returnValue({}) rows = [] - N = 2 + N = 200 for i in range(1 + len(events) / N): evs = events[i*N:(i + 1)*N] if not evs: -- cgit 1.4.1 From e1e9f0c5b23e1404ba885b501b6a347346a119ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 13:58:49 +0100 Subject: loop -> gatherResults --- synapse/storage/_base.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5c33bd81a8..015e04a8ce 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -973,16 +973,20 @@ class SQLBaseStore(object): res = yield self._execute("_fetch_events", None, sql, *evs) rows.extend(res) - res = [] - for row in rows: - e = yield self._get_event_from_row( - txn, - row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - res.append(e) + res = yield defer.gatherResults( + [ + defer.maybeDeferred( + self._get_event_from_row, + txn, + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ], + consumeErrors=True, + ) for e in res: self._get_event_cache.prefill( -- cgit 1.4.1 From 2f7f8e1c2b10b9444a3de7777d3bcc9d19f9d6c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:17:36 +0100 Subject: Preemptively jump into a transaction if we ask for get_prev_content --- synapse/storage/_base.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 015e04a8ce..d896f5f918 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -883,20 +883,30 @@ class SQLBaseStore(object): missing_events = [e for e in event_ids if e not in event_map] - missing_events = yield self._fetch_events( - txn, - missing_events, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + def get_missing(txn=None): + missing_events = yield self._fetch_events( + txn, + missing_events, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - event_map.update(missing_events) + event_map.update(missing_events) + + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + + if missing_events and get_prev_content and not txn: + if get_prev_content and not txn: + # If we want prev_content then lets just jump into a txn. + res = yield self.runInteraction("_get_events", get_missing) + defer.returnValue(res) + + defer.returnValue(get_missing()) - defer.returnValue([ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ]) def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False): -- cgit 1.4.1 From 656223fbd3a87b278b8101175e0ac117a059a812 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:26:35 +0100 Subject: Actually, we probably want to run this in a transaction --- synapse/storage/_base.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d896f5f918..e44821b5ec 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -881,32 +881,29 @@ class SQLBaseStore(object): allow_rejected=allow_rejected, ) - missing_events = [e for e in event_ids if e not in event_map] + missing_events_ids = [e for e in event_ids if e not in event_map] - def get_missing(txn=None): - missing_events = yield self._fetch_events( + def get_missing(txn): + missing_events = unwrap_deferred(self._fetch_events( txn, - missing_events, + missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, - ) + )) event_map.update(missing_events) - defer.returnValue([ + return [ event_map[e_id] for e_id in event_ids if e_id in event_map and event_map[e_id] - ]) - - if missing_events and get_prev_content and not txn: - if get_prev_content and not txn: - # If we want prev_content then lets just jump into a txn. - res = yield self.runInteraction("_get_events", get_missing) - defer.returnValue(res) - - defer.returnValue(get_missing()) + ] + if not txn: + res = yield self.runInteraction("_get_events", get_missing) + defer.returnValue(res) + else: + defer.returnValue(get_missing(txn)) def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False): -- cgit 1.4.1 From 7d6a1dae31c96379f29bc9e4191d2c02f1dad640 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:27:58 +0100 Subject: Jump out early --- synapse/storage/_base.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e44821b5ec..1a76b22e4f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -883,6 +883,12 @@ class SQLBaseStore(object): missing_events_ids = [e for e in event_ids if e not in event_map] + if not missing_events_ids: + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + def get_missing(txn): missing_events = unwrap_deferred(self._fetch_events( txn, -- cgit 1.4.1 From 386b7330d2902fe8acac0efadb095be389700764 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:45:22 +0100 Subject: Move from _base to events --- synapse/storage/_base.py | 233 +------------------------------------------ synapse/storage/events.py | 246 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 247 insertions(+), 232 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1a76b22e4f..a8f8989e38 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,9 +15,7 @@ import logging from synapse.api.errors import StoreError -from synapse.events import FrozenEvent -from synapse.events.utils import prune_event -from synapse.util import unwrap_deferred + from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -29,7 +27,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import simplejson as json import sys import time import threading @@ -868,234 +865,6 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) - @defer.inlineCallbacks - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False, txn=None): - if not event_ids: - defer.returnValue([]) - - event_map = self._get_events_from_cache( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_map] - - if not missing_events_ids: - defer.returnValue([ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ]) - - def get_missing(txn): - missing_events = unwrap_deferred(self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - )) - - event_map.update(missing_events) - - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] - - if not txn: - res = yield self.runInteraction("_get_events", get_missing) - defer.returnValue(res) - else: - defer.returnValue(get_missing(txn)) - - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._get_events( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - txn=txn, - )) - - def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, - get_prev_content) - - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - events = self._get_events_txn( - txn, [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - return events[0] if events else None - - def _get_events_from_cache(self, events, check_redacted, get_prev_content, - allow_rejected): - event_map = {} - - for event_id in events: - try: - ret = self._get_event_cache.get( - event_id, check_redacted, get_prev_content - ) - - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None - except KeyError: - pass - - return event_map - - @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - rows = [] - N = 200 - for i in range(1 + len(events) / N): - evs = events[i*N:(i + 1)*N] - if not evs: - break - - sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " - " FROM event_json as e" - " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"]*len(evs)),) - - if txn: - txn.execute(sql, evs) - rows.extend(txn.fetchall()) - else: - res = yield self._execute("_fetch_events", None, sql, *evs) - rows.extend(res) - - res = yield defer.gatherResults( - [ - defer.maybeDeferred( - self._get_event_from_row, - txn, - row[0], row[1], row[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row[3], - ) - for row in rows - ], - consumeErrors=True, - ) - - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - - defer.returnValue({ - e.event_id: e - for e in res if e - }) - - @defer.inlineCallbacks - def _get_event_from_row(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - def select(txn, *args, **kwargs): - if txn: - return self._simple_select_one_onecol_txn(txn, *args, **kwargs) - else: - return self._simple_select_one_onecol( - *args, - desc="_get_event_from_row", **kwargs - ) - - def get_event(txn, *args, **kwargs): - if txn: - return self._get_event_txn(txn, *args, **kwargs) - else: - return self.get_event(*args, **kwargs) - - if rejected_reason: - rejected_reason = yield select( - txn, - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = yield select( - txn, - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = yield get_event( - txn, - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield get_event( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - - defer.returnValue(ev) - - def _parse_events(self, rows): - return self.runInteraction( - "_parse_events", self._parse_events_txn, rows - ) - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] - - return self._get_events_txn(txn, event_ids) - - def _has_been_redacted_txn(self, txn, event): - sql = "SELECT event_id FROM redactions WHERE redacts = ?" - txn.execute(sql, (event.event_id,)) - result = txn.fetchone() - return result[0] if result else None - def get_next_stream_id(self): with self._next_stream_id_lock: i = self._next_stream_id diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9242b0a84e..f960ef8350 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,6 +17,10 @@ from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event +from synapse.util import unwrap_deferred + from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -26,6 +30,7 @@ from syutil.jsonutil import encode_canonical_json from contextlib import contextmanager import logging +import simplejson as json logger = logging.getLogger(__name__) @@ -393,3 +398,244 @@ class EventsStore(SQLBaseStore): return self.runInteraction( "have_events", f, ) + + @defer.inlineCallbacks + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False, txn=None): + if not event_ids: + defer.returnValue([]) + + event_map = self._get_events_from_cache( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + missing_events_ids = [e for e in event_ids if e not in event_map] + + if not missing_events_ids: + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + + if not txn: + missing_events = yield self.runInteraction( + "_get_events", + self._fetch_events_txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + else: + missing_events = yield self._fetch_events( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + event_map.update(missing_events) + + defer.returnValue([ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ]) + + + def _get_events_txn(self, txn, event_ids, check_redacted=True, + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._get_events( + event_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + txn=txn, + )) + + def _invalidate_get_event_cache(self, event_id): + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) + + def _get_event_txn(self, txn, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False): + + events = self._get_events_txn( + txn, [event_id], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + return events[0] if events else None + + def _get_events_from_cache(self, events, check_redacted, get_prev_content, + allow_rejected): + event_map = {} + + for event_id in events: + try: + ret = self._get_event_cache.get( + event_id, check_redacted, get_prev_content + ) + + if allow_rejected or not ret.rejected_reason: + event_map[event_id] = ret + else: + event_map[event_id] = None + except KeyError: + pass + + return event_map + + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + return unwrap_deferred(self._fetch_events( + txn, events, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + )) + + @defer.inlineCallbacks + def _fetch_events(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) + + rows = [] + N = 200 + for i in range(1 + len(events) / N): + evs = events[i*N:(i + 1)*N] + if not evs: + break + + sql = ( + "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + " FROM event_json as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"]*len(evs)),) + + if txn: + txn.execute(sql, evs) + rows.extend(txn.fetchall()) + else: + res = yield self._execute("_fetch_events", None, sql, *evs) + rows.extend(res) + + res = yield defer.gatherResults( + [ + defer.maybeDeferred( + self._get_event_from_row, + txn, + row[0], row[1], row[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row[3], + ) + for row in rows + ], + consumeErrors=True, + ) + + for e in res: + self._get_event_cache.prefill( + e.event_id, check_redacted, get_prev_content, e + ) + + defer.returnValue({ + e.event_id: e + for e in res if e + }) + + @defer.inlineCallbacks + def _get_event_from_row(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + def select(txn, *args, **kwargs): + if txn: + return self._simple_select_one_onecol_txn(txn, *args, **kwargs) + else: + return self._simple_select_one_onecol( + *args, + desc="_get_event_from_row", **kwargs + ) + + def get_event(txn, *args, **kwargs): + if txn: + return self._get_event_txn(txn, *args, **kwargs) + else: + return self.get_event(*args, **kwargs) + + if rejected_reason: + rejected_reason = yield select( + txn, + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield select( + txn, + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield get_event( + txn, + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield get_event( + txn, + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + defer.returnValue(ev) + + def _parse_events(self, rows): + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) + + def _parse_events_txn(self, txn, rows): + event_ids = [r["event_id"] for r in rows] + + return self._get_events_txn(txn, event_ids) + + def _has_been_redacted_txn(self, txn, event): + sql = "SELECT event_id FROM redactions WHERE redacts = ?" + txn.execute(sql, (event.event_id,)) + result = txn.fetchone() + return result[0] if result else None -- cgit 1.4.1 From f4d58deba19bde81b629c8ceb8df3aca60aa1e15 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:45:42 +0100 Subject: PEP8 --- synapse/storage/events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f960ef8350..28d2c0896b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -445,7 +445,6 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False): return unwrap_deferred(self._get_events( @@ -494,7 +493,7 @@ class EventsStore(SQLBaseStore): return event_map def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + get_prev_content=False, allow_rejected=False): return unwrap_deferred(self._fetch_events( txn, events, check_redacted=check_redacted, -- cgit 1.4.1 From 7f4105a5c99d72662db0704ab03bff24a951005b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 14:51:06 +0100 Subject: Turn off preemptive transactions --- synapse/storage/events.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 28d2c0896b..0aa4e0d445 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,23 +420,13 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - if not txn: - missing_events = yield self.runInteraction( - "_get_events", - self._fetch_events_txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - else: - missing_events = yield self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + missing_events = yield self._fetch_events( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) -- cgit 1.4.1 From 7cd6a6f6cf03e5bfde99b839bdc67f8f5513cdc5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:34:02 +0100 Subject: Awful idea for speeding up fetching of events --- synapse/storage/_base.py | 4 ++ synapse/storage/events.py | 167 ++++++++++++++++++++++++++++++++++++---------- synapse/util/__init__.py | 8 +-- 3 files changed, 139 insertions(+), 40 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a8f8989e38..c20ff3a572 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -299,6 +299,10 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._event_fetch_lock = threading.Lock() + self._event_fetch_list = [] + self._event_fetch_ongoing = False + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0aa4e0d445..be88328ce5 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -15,7 +15,7 @@ from _base import SQLBaseStore, _RollbackButIsFineException -from twisted.internet import defer +from twisted.internet import defer, reactor from synapse.events import FrozenEvent from synapse.events.utils import prune_event @@ -89,18 +89,17 @@ class EventsStore(SQLBaseStore): Returns: Deferred : A FrozenEvent. """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, + events = yield self._get_events( + [event_id], + check_redacted=True, + get_prev_content=False, + allow_rejected=False, ) - if not event and not allow_none: + if not events and not allow_none: raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(event) + defer.returnValue(events[0] if events else None) @log_function def _persist_event_txn(self, txn, event, context, backfilled, @@ -420,13 +419,21 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - missing_events = yield self._fetch_events( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + if not txn: + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + else: + missing_events = self._fetch_events_txn( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) @@ -492,11 +499,82 @@ class EventsStore(SQLBaseStore): )) @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): if not events: defer.returnValue({}) + def do_fetch(txn): + event_list = [] + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) + + row_dict = { + r["event_id"]: r + for r in rows + } + + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass + finally: + with self._event_fetch_lock: + self._event_fetch_ongoing = False + + def cb(rows): + return defer.gatherResults([ + self._get_event_from_row( + None, + row["internal_metadata"], row["json"], row["redacts"], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row["rejects"], + ) + for row in rows + ]) + + d = defer.Deferred() + d.addCallback(cb) + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, d) + ) + + if not self._event_fetch_ongoing: + self.runInteraction( + "do_fetch", + do_fetch + ) + + res = yield d + + defer.returnValue({ + e.event_id: e + for e in res if e + }) + + def _fetch_event_rows(self, txn, events): rows = [] N = 200 for i in range(1 + len(events) / N): @@ -505,43 +583,56 @@ class EventsStore(SQLBaseStore): break sql = ( - "SELECT e.internal_metadata, e.json, r.redacts, rej.event_id " + "SELECT " + " e.event_id as event_id, " + " e.internal_metadata," + " e.json," + " r.redacts as redacts," + " rej.event_id as rejects " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" ) % (",".join(["?"]*len(evs)),) - if txn: - txn.execute(sql, evs) - rows.extend(txn.fetchall()) - else: - res = yield self._execute("_fetch_events", None, sql, *evs) - rows.extend(res) + txn.execute(sql, evs) + rows.extend(self.cursor_to_dict(txn)) + + return rows + + @defer.inlineCallbacks + def _fetch_events(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) + + if txn: + rows = self._fetch_event_rows( + txn, events, + ) + else: + rows = yield self.runInteraction( + self._fetch_event_rows, + events, + ) res = yield defer.gatherResults( [ defer.maybeDeferred( self._get_event_from_row, txn, - row[0], row[1], row[2], + row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, - rejected_reason=row[3], + rejected_reason=row["rejects"], ) for row in rows - ], - consumeErrors=True, + ] ) - for e in res: - self._get_event_cache.prefill( - e.event_id, check_redacted, get_prev_content, e - ) - defer.returnValue({ - e.event_id: e - for e in res if e + r.event_id: r + for r in res }) @defer.inlineCallbacks @@ -611,6 +702,10 @@ class EventsStore(SQLBaseStore): if prev: ev.unsigned["prev_content"] = prev.get_dict()["content"] + self._get_event_cache.prefill( + ev.event_id, check_redacted, get_prev_content, ev + ) + defer.returnValue(ev) def _parse_events(self, rows): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index b9afb3364d..260714ccc2 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -80,16 +80,16 @@ class Clock(object): def stop_looping_call(self, loop): loop.stop() - def call_later(self, delay, callback): + def call_later(self, delay, callback, *args, **kwargs): current_context = LoggingContext.current_context() - def wrapped_callback(): + def wrapped_callback(*args, **kwargs): with PreserveLoggingContext(): LoggingContext.thread_local.current_context = current_context - callback() + callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback) + return reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer): timer.cancel() -- cgit 1.4.1 From 96c5b9f87cd5d959d0976487399d4b913082598d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:36:04 +0100 Subject: Don't start up more fetch_events --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index be88328ce5..0859518b1b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -566,6 +566,7 @@ class EventsStore(SQLBaseStore): "do_fetch", do_fetch ) + self._event_fetch_ongoing = True res = yield d -- cgit 1.4.1 From 142934084a374c3e47b63939526827f5afa7410d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:40:21 +0100 Subject: Count and loop --- synapse/storage/_base.py | 2 +- synapse/storage/events.py | 70 +++++++++++++++++++++++------------------------ 2 files changed, 35 insertions(+), 37 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c20ff3a572..97bf42469a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,7 +301,7 @@ class SQLBaseStore(object): self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] - self._event_fetch_ongoing = False + self._event_fetch_ongoing = 0 self.database_engine = hs.database_engine diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0859518b1b..a6b2e7677f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore): def do_fetch(txn): event_list = [] - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - return - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + while True: + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) + + row_dict = { + r["event_id"]: r + for r in rows + } - for ids, d in event_list: - d.callback( - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) - except Exception as e: - for _, d in event_list: - try: - reactor.callFromThread(d.errback, e) - except: - pass - finally: - with self._event_fetch_lock: - self._event_fetch_ongoing = False + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass def cb(rows): return defer.gatherResults([ @@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore): (events, d) ) - if not self._event_fetch_ongoing: + if self._event_fetch_ongoing < 3: + self._event_fetch_ongoing += 1 self.runInteraction( "do_fetch", do_fetch ) - self._event_fetch_ongoing = True res = yield d -- cgit 1.4.1 From ef3d8754f51e38375dc8d5144d2224d5e86ce458 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:41:55 +0100 Subject: Call from right thread --- synapse/storage/events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a6b2e7677f..59af21a2ca 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -513,6 +513,7 @@ class EventsStore(SQLBaseStore): self._event_fetch_list = [] if not event_list: + self._event_fetch_ongoing -= 1 return event_id_lists = zip(*event_list)[0] @@ -527,7 +528,8 @@ class EventsStore(SQLBaseStore): } for ids, d in event_list: - d.callback( + reactor.callFromThread( + d.callback, [ row_dict[i] for i in ids if i in row_dict -- cgit 1.4.1 From 1d566edb81e1dffea026d4e603a12cee664a8eda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 16:54:35 +0100 Subject: Remove race condition --- synapse/storage/_base.py | 168 +++++++++++++++++++++++------------- synapse/storage/engines/postgres.py | 2 + synapse/storage/engines/sqlite3.py | 2 + synapse/storage/events.py | 81 +++++++++-------- 4 files changed, 157 insertions(+), 96 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 97bf42469a..ceff99c16d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -26,6 +26,8 @@ from util.id_generators import IdGenerator, StreamIdGenerator from twisted.internet import defer from collections import namedtuple, OrderedDict + +import contextlib import functools import sys import time @@ -299,7 +301,7 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Lock() + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 @@ -342,6 +344,84 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) + @contextlib.contextmanager + def _new_transaction(self, conn, desc, after_callbacks): + start = time.time() * 1000 + txn_id = self._TXN_ID + + # We don't really need these to be unique, so lets stop it from + # growing really large. + self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + + name = "%s-%x" % (desc, txn_id, ) + + transaction_logger.debug("[TXN START] {%s}", name) + + try: + i = 0 + N = 5 + while True: + try: + txn = conn.cursor() + txn = LoggingTransaction( + txn, name, self.database_engine, after_callbacks + ) + except self.database_engine.module.OperationalError as e: + # This can happen if the database disappears mid + # transaction. + logger.warn( + "[TXN OPERROR] {%s} %s %d/%d", + name, e, i, N + ) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + raise + except self.database_engine.module.DatabaseError as e: + if self.database_engine.is_deadlock(e): + logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) + if i < N: + i += 1 + try: + conn.rollback() + except self.database_engine.module.Error as e1: + logger.warn( + "[TXN EROLL] {%s} %s", + name, e1, + ) + continue + raise + + try: + yield txn + conn.commit() + return + except: + try: + conn.rollback() + except: + pass + raise + except Exception as e: + logger.debug("[TXN FAIL] {%s} %s", name, e) + raise + finally: + end = time.time() * 1000 + duration = end - start + + transaction_logger.debug("[TXN END] {%s} %f", name, duration) + + self._current_txn_total_time += duration + self._txn_perf_counters.update(desc, start, end) + sql_txn_timer.inc_by(duration, desc) + @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" @@ -353,83 +433,49 @@ class SQLBaseStore(object): def inner_func(conn, *args, **kwargs): with LoggingContext("runInteraction") as context: + sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) + if self.database_engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") conn.reconnect() current_context.copy_to(context) - start = time.time() * 1000 - txn_id = self._TXN_ID + with self._new_transaction(conn, desc, after_callbacks) as txn: + return func(txn, *args, **kwargs) - # We don't really need these to be unique, so lets stop it from - # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + result = yield preserve_context_over_fn( + self._db_pool.runWithConnection, + inner_func, *args, **kwargs + ) - name = "%s-%x" % (desc, txn_id, ) + for after_callback, after_args in after_callbacks: + after_callback(*after_args) + defer.returnValue(result) + @defer.inlineCallbacks + def runWithConnection(self, func, *args, **kwargs): + """Wraps the .runInteraction() method on the underlying db_pool.""" + current_context = LoggingContext.current_context() + + start_time = time.time() * 1000 + + def inner_func(conn, *args, **kwargs): + with LoggingContext("runWithConnection") as context: sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) - transaction_logger.debug("[TXN START] {%s}", name) - try: - i = 0 - N = 5 - while True: - try: - txn = conn.cursor() - txn = LoggingTransaction( - txn, name, self.database_engine, after_callbacks - ) - return func(txn, *args, **kwargs) - except self.database_engine.module.OperationalError as e: - # This can happen if the database disappears mid - # transaction. - logger.warn( - "[TXN OPERROR] {%s} %s %d/%d", - name, e, i, N - ) - if i < N: - i += 1 - try: - conn.rollback() - except self.database_engine.module.Error as e1: - logger.warn( - "[TXN EROLL] {%s} %s", - name, e1, - ) - continue - except self.database_engine.module.DatabaseError as e: - if self.database_engine.is_deadlock(e): - logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N) - if i < N: - i += 1 - try: - conn.rollback() - except self.database_engine.module.Error as e1: - logger.warn( - "[TXN EROLL] {%s} %s", - name, e1, - ) - continue - raise - except Exception as e: - logger.debug("[TXN FAIL] {%s} %s", name, e) - raise - finally: - end = time.time() * 1000 - duration = end - start - transaction_logger.debug("[TXN END] {%s} %f", name, duration) + if self.database_engine.is_connection_closed(conn): + logger.debug("Reconnecting closed database connection") + conn.reconnect() + + current_context.copy_to(context) - self._current_txn_total_time += duration - self._txn_perf_counters.update(desc, start, end) - sql_txn_timer.inc_by(duration, desc) + return func(conn, *args, **kwargs) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, inner_func, *args, **kwargs ) - for after_callback, after_args in after_callbacks: - after_callback(*after_args) defer.returnValue(result) def cursor_to_dict(self, cursor): diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a323028546..4a855ffd56 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -19,6 +19,8 @@ from ._base import IncorrectDatabaseSetup class PostgresEngine(object): + single_threaded = False + def __init__(self, database_module): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index ff13d8006a..d18e2808d1 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -17,6 +17,8 @@ from synapse.storage import prepare_database, prepare_sqlite3_database class Sqlite3Engine(object): + single_threaded = True + def __init__(self, database_module): self.module = database_module diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 59af21a2ca..b4abd83260 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -504,23 +504,26 @@ class EventsStore(SQLBaseStore): if not events: defer.returnValue({}) - def do_fetch(txn): + def do_fetch(conn): event_list = [] while True: try: with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: + i = 0 + while not self._event_fetch_list: self._event_fetch_ongoing -= 1 return + event_list = self._event_fetch_list + self._event_fetch_list = [] + event_id_lists = zip(*event_list)[0] event_ids = [ item for sublist in event_id_lists for item in sublist ] - rows = self._fetch_event_rows(txn, event_ids) + + with self._new_transaction(conn, "do_fetch", []) as txn: + rows = self._fetch_event_rows(txn, event_ids) row_dict = { r["event_id"]: r @@ -528,22 +531,44 @@ class EventsStore(SQLBaseStore): } for ids, d in event_list: - reactor.callFromThread( - d.callback, - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) + def fire(): + if not d.called: + d.callback( + [ + row_dict[i] + for i in ids + if i in row_dict + ] + ) + reactor.callFromThread(fire) except Exception as e: + logger.exception("do_fetch") for _, d in event_list: - try: + if not d.called: reactor.callFromThread(d.errback, e) - except: - pass - def cb(rows): - return defer.gatherResults([ + with self._event_fetch_lock: + self._event_fetch_ongoing -= 1 + return + + events_d = defer.Deferred() + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, events_d) + ) + + self._event_fetch_lock.notify_all() + + # if self._event_fetch_ongoing < 5: + self._event_fetch_ongoing += 1 + self.runWithConnection( + do_fetch + ) + + rows = yield events_d + + res = yield defer.gatherResults( + [ self._get_event_from_row( None, row["internal_metadata"], row["json"], row["redacts"], @@ -552,23 +577,9 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ]) - - d = defer.Deferred() - d.addCallback(cb) - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, d) - ) - - if self._event_fetch_ongoing < 3: - self._event_fetch_ongoing += 1 - self.runInteraction( - "do_fetch", - do_fetch - ) - - res = yield d + ], + consumeErrors=True + ) defer.returnValue({ e.event_id: e -- cgit 1.4.1 From a2c4f3f150f63c720370f6882da804c8ac20fd69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 10:54:04 +0100 Subject: Fix daedlock --- synapse/federation/federation_client.py | 15 +++- synapse/federation/federation_server.py | 2 + synapse/handlers/message.py | 33 ++++++--- synapse/storage/_base.py | 26 +++---- synapse/storage/events.py | 125 +++++++++++++++++++------------- synapse/storage/stream.py | 2 + tests/storage/test_base.py | 3 +- 7 files changed, 122 insertions(+), 84 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..c255df1bbb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -222,7 +222,7 @@ class FederationClient(FederationBase): for p in transaction_data["pdus"] ] - if pdu_list: + if pdu_list and pdu_list[0]: pdu = pdu_list[0] # Check signatures are correct. @@ -255,7 +255,7 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None: + if self._get_pdu_cache is not None and pdu: self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu) @@ -475,6 +475,9 @@ class FederationClient(FederationBase): limit (int): Maximum number of events to return. min_depth (int): Minimum depth of events tor return. """ + logger.debug("get_missing_events: latest_events: %r", latest_events) + logger.debug("get_missing_events: earliest_events_ids: %r", earliest_events_ids) + try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -485,6 +488,8 @@ class FederationClient(FederationBase): min_depth=min_depth, ) + logger.debug("get_missing_events: Got content: %r", content) + events = [ self.event_from_pdu_json(e) for e in content.get("events", []) @@ -494,6 +499,8 @@ class FederationClient(FederationBase): destination, events, outlier=False ) + logger.debug("get_missing_events: signed_events: %r", signed_events) + have_gotten_all_from_destination = True except HttpResponseException as e: if not e.code == 400: @@ -518,6 +525,8 @@ class FederationClient(FederationBase): # Are we missing any? seen_events = set(earliest_events_ids) + + logger.debug("get_missing_events: signed_events2: %r", signed_events) seen_events.update(e.event_id for e in signed_events) missing_events = {} @@ -561,7 +570,7 @@ class FederationClient(FederationBase): res = yield defer.DeferredList(deferreds, consumeErrors=True) for (result, val), (e_id, _) in zip(res, ordered_missing): - if result: + if result and val: signed_events.append(val) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index cd79e23f4b..2c6488dd1b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -415,6 +415,8 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: + logger.debug("We're missing: %r", prevs-seen) + latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 867fdbefb0..6a1b25d112 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -303,18 +303,27 @@ class MessageHandler(BaseHandler): if event.membership != Membership.JOIN: return try: - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=now_token.room_key, - ), - self.state_handler.get_current_state( - event.room_id - ), - ] - ).addErrback(unwrapFirstError) + # (messages, token), current_state = yield defer.gatherResults( + # [ + # self.store.get_recent_events_for_room( + # event.room_id, + # limit=limit, + # end_token=now_token.room_key, + # ), + # self.state_handler.get_current_state( + # event.room_id + # ), + # ] + # ).addErrback(unwrapFirstError) + + messages, token = yield self.store.get_recent_events_for_room( + event.room_id, + limit=limit, + end_token=now_token.room_key, + ) + current_state = yield self.state_handler.get_current_state( + event.room_id + ) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index ceff99c16d..0df1b46edc 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,10 +301,12 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Condition() + self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + self._pending_ds = [] + self.database_engine = hs.database_engine self._stream_id_gen = StreamIdGenerator() @@ -344,8 +346,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - @contextlib.contextmanager - def _new_transaction(self, conn, desc, after_callbacks): + def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -366,6 +367,9 @@ class SQLBaseStore(object): txn = LoggingTransaction( txn, name, self.database_engine, after_callbacks ) + r = func(txn, *args, **kwargs) + conn.commit() + return r except self.database_engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -398,17 +402,6 @@ class SQLBaseStore(object): ) continue raise - - try: - yield txn - conn.commit() - return - except: - try: - conn.rollback() - except: - pass - raise except Exception as e: logger.debug("[TXN FAIL] {%s} %s", name, e) raise @@ -440,8 +433,9 @@ class SQLBaseStore(object): conn.reconnect() current_context.copy_to(context) - with self._new_transaction(conn, desc, after_callbacks) as txn: - return func(txn, *args, **kwargs) + return self._new_transaction( + conn, desc, after_callbacks, func, *args, **kwargs + ) result = yield preserve_context_over_fn( self._db_pool.runWithConnection, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4abd83260..260bdf0ec4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore): ]) if not txn: + logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) + logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, @@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, )) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - def do_fetch(conn): - event_list = [] + def _do_fetch(self, conn): + event_list = [] + try: while True: - try: - with self._event_fetch_lock: - i = 0 - while not self._event_fetch_list: - self._event_fetch_ongoing -= 1 - return - - event_list = self._event_fetch_list - self._event_fetch_list = [] - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - with self._new_transaction(conn, "do_fetch", []) as txn: - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + logger.debug("do_fetch getting lock") + with self._event_fetch_lock: + logger.debug("do_fetch go lock: %r", self._event_fetch_list) + event_list = self._event_fetch_list + self._event_fetch_list = [] + if not event_list: + self._event_fetch_ongoing -= 1 + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], self._fetch_event_rows, event_ids + ) - for ids, d in event_list: - def fire(): - if not d.called: + row_dict = { + r["event_id"]: r + for r in rows + } + + logger.debug("do_fetch got events: %r", row_dict.keys()) + + def fire(evs): + for ids, d in evs: + if not d.called: + try: d.callback( [ row_dict[i] @@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore): if i in row_dict ] ) - reactor.callFromThread(fire) - except Exception as e: - logger.exception("do_fetch") - for _, d in event_list: - if not d.called: - reactor.callFromThread(d.errback, e) + except: + logger.exception("Failed to callback") + reactor.callFromThread(fire, event_list) + except Exception as e: + logger.exception("do_fetch") - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - return + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) + + if event_list: + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + try: + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) - self._event_fetch_lock.notify_all() + self._event_fetch_ongoing += 1 - # if self._event_fetch_ongoing < 5: - self._event_fetch_ongoing += 1 self.runWithConnection( - do_fetch + self._do_fetch ) - rows = yield events_d + except Exception as e: + if not events_d.called: + events_d.errback(e) + + logger.debug("events_d before") + try: + rows = yield events_d + except: + logger.exception("events_d") + logger.debug("events_d after") res = yield defer.gatherResults( [ @@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) + logger.debug("gatherResults after") defer.returnValue({ e.event_id: e @@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ] + ], + consumeErrors=True, ) defer.returnValue({ diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d16b57c515..af45fc5619 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -357,10 +357,12 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + logger.debug("stream before") events = yield self._get_events( [r["event_id"] for r in rows], get_prev_content=True ) + logger.debug("stream after") self._set_before_and_after(events, rows) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 8c348ecc95..8573f18b55 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -33,8 +33,9 @@ class SQLBaseStoreTestCase(unittest.TestCase): def setUp(self): self.db_pool = Mock(spec=["runInteraction"]) self.mock_txn = Mock() - self.mock_conn = Mock(spec_set=["cursor"]) + self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"]) self.mock_conn.cursor.return_value = self.mock_txn + self.mock_conn.rollback.return_value = None # Our fake runInteraction just runs synchronously inline def runInteraction(func, *args, **kwargs): -- cgit 1.4.1 From de01438a578c285b632a4c791a56bebfe29fb06b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:00:50 +0100 Subject: Sort out error handling --- synapse/storage/events.py | 47 ++++++++++++++++++++++------------------------- 1 file changed, 22 insertions(+), 25 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 260bdf0ec4..f2c181dde7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -502,8 +502,8 @@ class EventsStore(SQLBaseStore): def _do_fetch(self, conn): event_list = [] - try: - while True: + while True: + try: logger.debug("do_fetch getting lock") with self._event_fetch_lock: logger.debug("do_fetch go lock: %r", self._event_fetch_list) @@ -543,16 +543,16 @@ class EventsStore(SQLBaseStore): except: logger.exception("Failed to callback") reactor.callFromThread(fire, event_list) - except Exception as e: - logger.exception("do_fetch") + except Exception as e: + logger.exception("do_fetch") - def fire(evs): - for _, d in evs: - if not d.called: - d.errback(e) + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) - if event_list: - reactor.callFromThread(fire, event_list) + if event_list: + reactor.callFromThread(fire, event_list) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, @@ -561,29 +561,26 @@ class EventsStore(SQLBaseStore): defer.returnValue({}) events_d = defer.Deferred() - try: - logger.debug("enqueueueueue getting lock") - with self._event_fetch_lock: - logger.debug("enqueue go lock") - self._event_fetch_list.append( - (events, events_d) - ) + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) + if self._event_fetch_ongoing < 1: self._event_fetch_ongoing += 1 + should_start = True + else: + should_start = False + if should_start: self.runWithConnection( self._do_fetch ) - except Exception as e: - if not events_d.called: - events_d.errback(e) - logger.debug("events_d before") - try: - rows = yield events_d - except: - logger.exception("events_d") + rows = yield events_d logger.debug("events_d after") res = yield defer.gatherResults( -- cgit 1.4.1 From 575ec91d82e6b283bd21471c5a874de968c97bff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:15:10 +0100 Subject: Correctly pass through params --- synapse/storage/events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f2c181dde7..143c24b10d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -91,9 +91,9 @@ class EventsStore(SQLBaseStore): """ events = yield self._get_events( [event_id], - check_redacted=True, - get_prev_content=False, - allow_rejected=False, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, ) if not events and not allow_none: -- cgit 1.4.1 From 372d4c6d7b38f89fb79509cf432915d96bdc8164 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:26:00 +0100 Subject: Srsly. Don't use closures. Baaaaaad --- synapse/storage/events.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 143c24b10d..2c3e6d5a5c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -529,20 +529,18 @@ class EventsStore(SQLBaseStore): logger.debug("do_fetch got events: %r", row_dict.keys()) - def fire(evs): - for ids, d in evs: + def fire(lst, res): + for ids, d in lst: if not d.called: try: - d.callback( - [ - row_dict[i] - for i in ids - if i in row_dict - ] - ) + d.callback([ + res[i] + for i in ids + if i in res + ]) except: logger.exception("Failed to callback") - reactor.callFromThread(fire, event_list) + reactor.callFromThread(fire, event_list, row_dict) except Exception as e: logger.exception("do_fetch") -- cgit 1.4.1 From aa32bd38e40cd8d69406fe74581290ad7fe34f35 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:35:04 +0100 Subject: Add a wait --- synapse/storage/_base.py | 2 +- synapse/storage/events.py | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0df1b46edc..5d86aa5cd4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,7 +301,7 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) - self._event_fetch_lock = threading.Lock() + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2c3e6d5a5c..f694b877f4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -502,6 +502,7 @@ class EventsStore(SQLBaseStore): def _do_fetch(self, conn): event_list = [] + i = 0 while True: try: logger.debug("do_fetch getting lock") @@ -510,8 +511,14 @@ class EventsStore(SQLBaseStore): event_list = self._event_fetch_list self._event_fetch_list = [] if not event_list: - self._event_fetch_ongoing -= 1 - return + if self.database_engine.single_threaded or i > 5: + self._event_fetch_ongoing -= 1 + return + else: + self._event_fetch_lock.wait(0.1) + i += 1 + continue + i = 0 event_id_lists = zip(*event_list)[0] event_ids = [ @@ -566,6 +573,8 @@ class EventsStore(SQLBaseStore): (events, events_d) ) + self._event_fetch_lock.notify_all() + if self._event_fetch_ongoing < 1: self._event_fetch_ongoing += 1 should_start = True -- cgit 1.4.1 From e275a9c0d9365f77cb06f2a04c55f591ac5b54c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 11:54:51 +0100 Subject: preserve log context --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f694b877f4..c621516039 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -21,6 +21,7 @@ from synapse.events import FrozenEvent from synapse.events.utils import prune_event from synapse.util import unwrap_deferred +from synapse.util.logcontext import preserve_context_over_deferred from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -587,7 +588,7 @@ class EventsStore(SQLBaseStore): ) logger.debug("events_d before") - rows = yield events_d + rows = yield preserve_context_over_deferred(events_d) logger.debug("events_d after") res = yield defer.gatherResults( -- cgit 1.4.1 From 0f29cfabc3177c149a4c34fc05398b8d9f3a06ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 14:06:42 +0100 Subject: Remove debug logging --- synapse/storage/events.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c621516039..716f10386f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,13 +506,11 @@ class EventsStore(SQLBaseStore): i = 0 while True: try: - logger.debug("do_fetch getting lock") with self._event_fetch_lock: - logger.debug("do_fetch go lock: %r", self._event_fetch_list) event_list = self._event_fetch_list self._event_fetch_list = [] if not event_list: - if self.database_engine.single_threaded or i > 5: + if self.database_engine.single_threaded or i > 3: self._event_fetch_ongoing -= 1 return else: @@ -535,8 +533,6 @@ class EventsStore(SQLBaseStore): for r in rows } - logger.debug("do_fetch got events: %r", row_dict.keys()) - def fire(lst, res): for ids, d in lst: if not d.called: @@ -567,16 +563,14 @@ class EventsStore(SQLBaseStore): defer.returnValue({}) events_d = defer.Deferred() - logger.debug("enqueueueueue getting lock") with self._event_fetch_lock: - logger.debug("enqueue go lock") self._event_fetch_list.append( (events, events_d) ) - self._event_fetch_lock.notify_all() + self._event_fetch_lock.notify() - if self._event_fetch_ongoing < 1: + if self._event_fetch_ongoing < 3: self._event_fetch_ongoing += 1 should_start = True else: @@ -587,9 +581,7 @@ class EventsStore(SQLBaseStore): self._do_fetch ) - logger.debug("events_d before") rows = yield preserve_context_over_deferred(events_d) - logger.debug("events_d after") res = yield defer.gatherResults( [ -- cgit 1.4.1 From d62dee7eae741034abfd050982b1cfc4e2181258 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:06:37 +0100 Subject: Remove more debug logging --- synapse/storage/events.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 716f10386f..3cf2f7cff8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -421,14 +421,12 @@ class EventsStore(SQLBaseStore): ]) if not txn: - logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) - logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, -- cgit 1.4.1 From acb12cc811d7ce7cb3c5b6544ed28f7d6592ef33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:20:05 +0100 Subject: Make store.get_current_state fetch events asyncly --- synapse/storage/events.py | 1 - synapse/storage/state.py | 7 ++++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3cf2f7cff8..066e1aab75 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -594,7 +594,6 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) - logger.debug("gatherResults after") defer.returnValue({ e.event_id: e diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f5642642d..b3f2a4dfa1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -159,11 +159,12 @@ class StateStore(SQLBaseStore): args = (room_id, ) txn.execute(sql, args) - results = self.cursor_to_dict(txn) + results = txn.fetchall() - return self._parse_events_txn(txn, results) + return [r[0] for r in results] - events = yield self.runInteraction("get_current_state", f) + event_ids = yield self.runInteraction("get_current_state", f) + events = yield self._get_events(event_ids) defer.returnValue(events) -- cgit 1.4.1 From 8763dd80efd19d562a97a8d5af59b85bc3678d46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 15:33:01 +0100 Subject: Don't fetch prev_content for current_state --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b3f2a4dfa1..56f0572f7e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -164,7 +164,7 @@ class StateStore(SQLBaseStore): return [r[0] for r in results] event_ids = yield self.runInteraction("get_current_state", f) - events = yield self._get_events(event_ids) + events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) -- cgit 1.4.1 From 70f272f71ca399205a72deb29b0f86ff3bf23618 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:34:17 +0100 Subject: Don't completely drain the list --- synapse/storage/events.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 066e1aab75..6ee2e9a485 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -505,8 +505,15 @@ class EventsStore(SQLBaseStore): while True: try: with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] + tot = 0 + for j, lst in enumerate(self._event_fetch_list): + if tot > 200: + break + tot += len(lst[0]) + + event_list = self._event_fetch_list[:j+1] + self._event_fetch_list = self._event_fetch_list[j+1:] + if not event_list: if self.database_engine.single_threaded or i > 3: self._event_fetch_ongoing -= 1 -- cgit 1.4.1 From 9ff7f66a2be6d3f0542b69e784b400df349869ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:36:03 +0100 Subject: init j --- synapse/storage/events.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6ee2e9a485..f21364606d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,6 +506,7 @@ class EventsStore(SQLBaseStore): try: with self._event_fetch_lock: tot = 0 + j = 0 for j, lst in enumerate(self._event_fetch_list): if tot > 200: break -- cgit 1.4.1 From 6c74fd62a0b2292b4fce9d59ea9790685e256e0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:45:35 +0100 Subject: Revert limiting of fetching, it didn't help perf. --- synapse/storage/events.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f21364606d..98a5663932 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -505,15 +505,8 @@ class EventsStore(SQLBaseStore): while True: try: with self._event_fetch_lock: - tot = 0 - j = 0 - for j, lst in enumerate(self._event_fetch_list): - if tot > 200: - break - tot += len(lst[0]) - - event_list = self._event_fetch_list[:j+1] - self._event_fetch_list = self._event_fetch_list[j+1:] + event_list = self._event_fetch_list + self._event_fetch_list = [] if not event_list: if self.database_engine.single_threaded or i > 3: -- cgit 1.4.1 From c3b37abdfd6d34ea7ff7698bd61dd37763e257b1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 15 May 2015 16:59:58 +0100 Subject: PEP8 --- synapse/storage/_base.py | 1 - synapse/storage/events.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5d86aa5cd4..b529e0543e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,7 +27,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict -import contextlib import functools import sys import time diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 98a5663932..25fb49a28d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -629,7 +629,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + get_prev_content=False, allow_rejected=False): if not events: defer.returnValue({}) -- cgit 1.4.1 From c71176858b9d58cfbe5520ad1dac8191c005fdc9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 10:11:14 +0100 Subject: Newline, remove debug logging --- synapse/federation/federation_server.py | 2 -- synapse/storage/_base.py | 1 - synapse/storage/schema/delta/19/event_index.sql | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2c6488dd1b..cd79e23f4b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -415,8 +415,6 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - logger.debug("We're missing: %r", prevs-seen) - latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b529e0543e..d1f050394d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,7 +15,6 @@ import logging from synapse.api.errors import StoreError - from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache diff --git a/synapse/storage/schema/delta/19/event_index.sql b/synapse/storage/schema/delta/19/event_index.sql index f3792817bb..3881fc9897 100644 --- a/synapse/storage/schema/delta/19/event_index.sql +++ b/synapse/storage/schema/delta/19/event_index.sql @@ -16,4 +16,4 @@ CREATE INDEX events_order_topo_stream_room ON events( topological_ordering, stream_ordering, room_id -); \ No newline at end of file +); -- cgit 1.4.1 From 4d1b6f4ad1bbae71c6c941b5951d9f9043ddace4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 14:03:46 +0100 Subject: Remove rejected events if we don't want rejected events --- synapse/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 25fb49a28d..5a04d45e1e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -643,6 +643,9 @@ class EventsStore(SQLBaseStore): events, ) + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] + res = yield defer.gatherResults( [ defer.maybeDeferred( -- cgit 1.4.1 From 65878a2319366b268a98dcff48eeb632ef67cc0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 14:06:30 +0100 Subject: Remove unused metric --- synapse/storage/_base.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d1f050394d..0f146998d9 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -46,7 +46,6 @@ sql_scheduling_timer = metrics.register_distribution("schedule_time") sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) -sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) caches_by_name = {} cache_counter = metrics.register_cache( -- cgit 1.4.1 From 165eb2dbe63a42ef5fdd947544e8d7fda0e7234f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 15:18:41 +0100 Subject: Comments and shuffle of functions --- synapse/storage/events.py | 80 +++++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 38 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a04d45e1e..9751f024d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -402,6 +402,10 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False, txn=None): + """Gets a collection of events. If `txn` is not None the we use the + current transaction to fetch events and we return a deferred that is + guarenteed to have resolved. + """ if not event_ids: defer.returnValue([]) @@ -490,16 +494,10 @@ class EventsStore(SQLBaseStore): return event_map - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._fetch_events( - txn, events, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - )) - def _do_fetch(self, conn): + """Takes a database connection and waits for requests for events from + the _event_fetch_list queue. + """ event_list = [] i = 0 while True: @@ -532,6 +530,7 @@ class EventsStore(SQLBaseStore): for r in rows } + # We only want to resolve deferreds from the main thread def fire(lst, res): for ids, d in lst: if not d.called: @@ -547,6 +546,7 @@ class EventsStore(SQLBaseStore): except Exception as e: logger.exception("do_fetch") + # We only want to resolve deferreds from the main thread def fire(evs): for _, d in evs: if not d.called: @@ -558,6 +558,10 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, get_prev_content=False, allow_rejected=False): + """Fetches events from the database using the _event_fetch_list. This + allows batch and bulk fetching of events - it allows us to fetch events + without having to create a new transaction for each request for events. + """ if not events: defer.returnValue({}) @@ -582,6 +586,9 @@ class EventsStore(SQLBaseStore): rows = yield preserve_context_over_deferred(events_d) + if not allow_rejected: + rows[:] = [r for r in rows if not r["rejects"]] + res = yield defer.gatherResults( [ self._get_event_from_row( @@ -627,49 +634,46 @@ class EventsStore(SQLBaseStore): return rows - @defer.inlineCallbacks - def _fetch_events(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _fetch_events_txn(self, txn, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): if not events: - defer.returnValue({}) + return {} - if txn: - rows = self._fetch_event_rows( - txn, events, - ) - else: - rows = yield self.runInteraction( - self._fetch_event_rows, - events, - ) + rows = self._fetch_event_rows( + txn, events, + ) if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( - [ - defer.maybeDeferred( - self._get_event_from_row, - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ], - consumeErrors=True, - ) + res = [ + unwrap_deferred(self._get_event_from_row( + txn, + row["internal_metadata"], row["json"], row["redacts"], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=row["rejects"], + )) + for row in rows + ] - defer.returnValue({ + return { r.event_id: r for r in res - }) + } @defer.inlineCallbacks def _get_event_from_row(self, txn, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): + """This is called when we have a row from the database that we want to + convert into an event. Depending on the given options it may do more + database ops to fill in extra information (e.g. previous content or + rejection reason.) + + `txn` may be None, and if so this creates new transactions for each + database op. + """ d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From 227f8ef03129a3eb9fe4e4a3f21e397b11032836 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:00:57 +0100 Subject: Split out _get_event_from_row back into defer and _txn version --- synapse/storage/events.py | 68 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9751f024d7..b0d474b8e4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -592,7 +592,6 @@ class EventsStore(SQLBaseStore): res = yield defer.gatherResults( [ self._get_event_from_row( - None, row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -647,13 +646,13 @@ class EventsStore(SQLBaseStore): rows[:] = [r for r in rows if not r["rejects"]] res = [ - unwrap_deferred(self._get_event_from_row( + self._get_event_from_row_txn( txn, row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, get_prev_content=get_prev_content, rejected_reason=row["rejects"], - )) + ) for row in rows ] @@ -663,17 +662,64 @@ class EventsStore(SQLBaseStore): } @defer.inlineCallbacks - def _get_event_from_row(self, txn, internal_metadata, js, redacted, + def _get_event_from_row(self, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, rejected_reason=None): - """This is called when we have a row from the database that we want to - convert into an event. Depending on the given options it may do more - database ops to fill in extra information (e.g. previous content or - rejection reason.) + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) - `txn` may be None, and if so this creates new transactions for each - database op. - """ + if rejected_reason: + rejected_reason = yield self._simple_select_one_onecol( + table="rejections", + keyvalues={"event_id": rejected_reason}, + retcol="reason", + desc="_get_event_from_row", + ) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + redaction_id = yield self._simple_select_one_onecol( + table="redactions", + keyvalues={"redacts": ev.event_id}, + retcol="event_id", + desc="_get_event_from_row", + ) + + ev.unsigned["redacted_by"] = redaction_id + # Get the redaction event. + + because = yield self.get_event( + redaction_id, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = yield self.get_event( + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + self._get_event_cache.prefill( + ev.event_id, check_redacted, get_prev_content, ev + ) + + defer.returnValue(ev) + + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From f407cbd2f1f034ee2f7be8a9464aa234266420cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:02:01 +0100 Subject: PEP8 --- synapse/storage/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b0d474b8e4..3f7f546bdd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -718,8 +718,8 @@ class EventsStore(SQLBaseStore): defer.returnValue(ev) def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): + check_redacted=True, get_prev_content=False, + rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From ab45e12d31bf8238c3cb1887d18d89ae2addbb7a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:07:19 +0100 Subject: Make not return a deferred _get_event_from_row_txn --- synapse/storage/events.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3f7f546bdd..4aa4e7ab15 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -723,23 +723,8 @@ class EventsStore(SQLBaseStore): d = json.loads(js) internal_metadata = json.loads(internal_metadata) - def select(txn, *args, **kwargs): - if txn: - return self._simple_select_one_onecol_txn(txn, *args, **kwargs) - else: - return self._simple_select_one_onecol( - *args, - desc="_get_event_from_row", **kwargs - ) - - def get_event(txn, *args, **kwargs): - if txn: - return self._get_event_txn(txn, *args, **kwargs) - else: - return self.get_event(*args, **kwargs) - if rejected_reason: - rejected_reason = yield select( + rejected_reason = self._simple_select_one_onecol_txn( txn, table="rejections", keyvalues={"event_id": rejected_reason}, @@ -755,7 +740,7 @@ class EventsStore(SQLBaseStore): if check_redacted and redacted: ev = prune_event(ev) - redaction_id = yield select( + redaction_id = self._simple_select_one_onecol_txn( txn, table="redactions", keyvalues={"redacts": ev.event_id}, @@ -765,7 +750,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["redacted_by"] = redaction_id # Get the redaction event. - because = yield get_event( + because = self._get_event_txn( txn, redaction_id, check_redacted=False @@ -775,7 +760,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["redacted_because"] = because if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield get_event( + prev = self._get_event_txn( txn, ev.unsigned["replaces_state"], get_prev_content=False, @@ -787,7 +772,7 @@ class EventsStore(SQLBaseStore): ev.event_id, check_redacted, get_prev_content, ev ) - defer.returnValue(ev) + return ev def _parse_events(self, rows): return self.runInteraction( -- cgit 1.4.1 From 9118a928621ae31a951412b867179f5e0a627976 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 20 May 2015 13:27:16 +0100 Subject: Split up _get_events into defer and txn versions --- synapse/storage/events.py | 59 ++++++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 24 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4aa4e7ab15..656e57b5c6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,6 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent from synapse.events.utils import prune_event -from synapse.util import unwrap_deferred from synapse.util.logcontext import preserve_context_over_deferred from synapse.util.logutils import log_function @@ -401,11 +400,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False, txn=None): - """Gets a collection of events. If `txn` is not None the we use the - current transaction to fetch events and we return a deferred that is - guarenteed to have resolved. - """ + get_prev_content=False, allow_rejected=False): if not event_ids: defer.returnValue([]) @@ -424,21 +419,12 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - if not txn: - missing_events = yield self._enqueue_events( - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - else: - missing_events = self._fetch_events_txn( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) + missing_events = yield self._enqueue_events( + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) event_map.update(missing_events) @@ -449,13 +435,38 @@ class EventsStore(SQLBaseStore): def _get_events_txn(self, txn, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False): - return unwrap_deferred(self._get_events( + if not event_ids: + return [] + + event_map = self._get_events_from_cache( event_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, - txn=txn, - )) + ) + + missing_events_ids = [e for e in event_ids if e not in event_map] + + if not missing_events_ids: + return [ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ] + + missing_events = self._fetch_events_txn( + txn, + missing_events_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + event_map.update(missing_events) + + return [ + event_map[e_id] for e_id in event_ids + if e_id in event_map and event_map[e_id] + ] def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): -- cgit 1.4.1 From 80a167b1f032ae9bcb75bdb20c2112a779dda516 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 11:18:22 +0100 Subject: Add comments --- synapse/storage/events.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 656e57b5c6..3d798f5fa2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -35,6 +35,16 @@ import simplejson as json logger = logging.getLogger(__name__) +# These values are used in the `enqueus_event` and `_do_fetch` methods to +# control how we batch/bulk fetch events from the database. +# The values are plucked out of thing air to make initial sync run faster +# on jki.re +# TODO: Make these configurable. +EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events +EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for events +EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for events + + class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function @@ -518,11 +528,12 @@ class EventsStore(SQLBaseStore): self._event_fetch_list = [] if not event_list: - if self.database_engine.single_threaded or i > 3: + single_threaded = self.database_engine.single_threaded + if single_threaded or i > EVENT_QUEUE_ITERATIONS: self._event_fetch_ongoing -= 1 return else: - self._event_fetch_lock.wait(0.1) + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) i += 1 continue i = 0 @@ -584,7 +595,7 @@ class EventsStore(SQLBaseStore): self._event_fetch_lock.notify() - if self._event_fetch_ongoing < 3: + if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 should_start = True else: -- cgit 1.4.1 From ac5f2bf9db9f37d2cb076cfa3a0912ecb948f508 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 14:50:57 +0100 Subject: s/for events/for requests for events/ --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3d798f5fa2..84a799bcff 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -42,7 +42,7 @@ logger = logging.getLogger(__name__) # TODO: Make these configurable. EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for events -EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for events +EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): -- cgit 1.4.1 From 27e4b45c0692489a11e4be2b05f3fcb99b3c7489 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 14:52:23 +0100 Subject: s/for events/for requests for events/ --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 84a799bcff..74946eac3e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -41,7 +41,7 @@ logger = logging.getLogger(__name__) # on jki.re # TODO: Make these configurable. EVENT_QUEUE_THREADS = 3 # Max number of threads that will fetch events -EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for events +EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events -- cgit 1.4.1