From f3788e3c7881de25c7d699bb9940b3cbd4dc3682 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 Dec 2014 23:37:08 +0000 Subject: Test some ideas that might help performance a bit --- synapse/storage/_base.py | 34 ++++++++++++++++++++++++++++------ synapse/storage/roommember.py | 8 +------- synapse/storage/state.py | 9 +-------- 3 files changed, 30 insertions(+), 21 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e0d97f440b..a6e2e0e2ef 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -440,14 +440,29 @@ class SQLBaseStore(object): ) def _get_events_txn(self, txn, event_ids): - events = [] - for e_id in event_ids: - ev = self._get_event_txn(txn, e_id) + if not event_ids: + return [] - if ev: - events.append(ev) + if len(event_ids) > 50: + events = [] + n = 50 + for e_ids in [event_ids[i:i + n] for i in range(0, len(event_ids), n)]: + events.extend(self._get_events_txn(txn, e_ids)) + return events - return events + where_clause = " OR ".join(["e.event_id = ?" for _ in event_ids]) + + sql = ( + "SELECT internal_metadata, json, r.event_id FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "WHERE %s" + ) % (where_clause,) + + txn.execute(sql, event_ids) + + res = txn.fetchall() + + return [self._get_event_from_row_txn(txn, *r) for r in res] def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=True): @@ -467,6 +482,13 @@ class SQLBaseStore(object): internal_metadata, js, redacted = res + return self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=True): d = json.loads(js) internal_metadata = json.loads(internal_metadata) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 05b275663e..4e416c50b1 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -183,20 +183,14 @@ class RoomMemberStore(SQLBaseStore): ) def _get_members_query_txn(self, txn, where_clause, where_values): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "SELECT e.* FROM events as e " "INNER JOIN room_memberships as m " "ON e.event_id = m.event_id " "INNER JOIN current_state_events as c " "ON m.event_id = c.event_id " "WHERE %(where)s " ) % { - "redacted": del_sql, "where": where_clause, } diff --git a/synapse/storage/state.py b/synapse/storage/state.py index afe3e5edea..ab80909712 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -62,14 +62,7 @@ class StateStore(SQLBaseStore): keyvalues={"state_group": group}, retcol="event_id", ) - state = [] - for state_id in state_ids: - s = self._get_events_txn( - txn, - [state_id], - ) - if s: - state.extend(s) + state = self._get_events_txn(txn, state_ids) res[group] = state -- cgit 1.4.1 From f0128f9600c59fbcb993bccbbbb32486009694d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 10:55:43 +0000 Subject: Add RoomMemberStore.get_users_in_room, so that we can get the list of joined users without having to retrieve the full events --- synapse/handlers/room.py | 13 +++++-------- synapse/storage/roommember.py | 13 +++++++++++++ 2 files changed, 18 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index deefc3c11e..5e5d95add1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -245,14 +245,12 @@ class RoomMemberHandler(BaseHandler): self.distributor.declare("user_left_room") @defer.inlineCallbacks - def get_room_members(self, room_id, membership=Membership.JOIN): + def get_room_members(self, room_id): hs = self.hs - memberships = yield self.store.get_room_members( - room_id=room_id, membership=membership - ) + users = yield self.store.get_users_in_room(room_id) - defer.returnValue([hs.parse_userid(m.user_id) for m in memberships]) + defer.returnValue([hs.parse_userid(u) for u in users]) @defer.inlineCallbacks def fetch_room_distributions_into(self, room_id, localusers=None, @@ -531,11 +529,10 @@ class RoomListHandler(BaseHandler): def get_public_room_list(self): chunk = yield self.store.get_rooms(is_public=True) for room in chunk: - joined_members = yield self.store.get_room_members( + joined_users = yield self.store.get_users_in_room( room_id=room["room_id"], - membership=Membership.JOIN ) - room["num_joined_members"] = len(joined_members) + room["num_joined_members"] = len(joined_users) # FIXME (erikj): START is no longer a valid value defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4e416c50b1..4921561fc3 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -123,6 +123,19 @@ class RoomMemberStore(SQLBaseStore): else: return None + def get_users_in_room(self, room_id): + def f(txn): + sql = ( + "SELECT m.user_id FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE m.membership = ? AND m.room_id = ?" + ) + + txn.execute(sql, (Membership.JOIN, room_id)) + return [r[0] for r in txn.fetchall()] + return self.runInteraction("get_users_in_room", f) + def get_room_members(self, room_id, membership=None): """Retrieve the current room member list for a room. -- cgit 1.4.1 From d7e8ea67b374d3b006f7277de531302abc410e57 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 11:18:02 +0000 Subject: Reformat --- synapse/storage/_base.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a6e2e0e2ef..a30b0bc410 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -440,15 +440,16 @@ class SQLBaseStore(object): ) def _get_events_txn(self, txn, event_ids): - if not event_ids: - return [] + if not event_ids: + return [] - if len(event_ids) > 50: - events = [] - n = 50 - for e_ids in [event_ids[i:i + n] for i in range(0, len(event_ids), n)]: - events.extend(self._get_events_txn(txn, e_ids)) - return events + if len(event_ids) > 50: + events = [] + n = 50 + split = [event_ids[i:i + n] for i in range(0, len(event_ids), n)] + for e_ids in split: + events.extend(self._get_events_txn(txn, e_ids)) + return events where_clause = " OR ".join(["e.event_id = ?" for _ in event_ids]) @@ -482,13 +483,13 @@ class SQLBaseStore(object): internal_metadata, js, redacted = res - return self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) + return self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=True): + check_redacted=True, get_prev_content=True): d = json.loads(js) internal_metadata = json.loads(internal_metadata) -- cgit 1.4.1 From 753126b8ccb56dc6539ce95758f3e87fe181064d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 11:18:12 +0000 Subject: Add some debug logging --- synapse/storage/state.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ab80909712..9aeb0b4063 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,10 @@ from ._base import SQLBaseStore +import logging + +logger = logging.getLogger(__name__) + class StateStore(SQLBaseStore): """ Keeps track of the state at a given event. @@ -54,6 +58,8 @@ class StateStore(SQLBaseStore): if group: groups.add(group) + logger.debug("Got groups: %s", groups) + res = {} for group in groups: state_ids = self._simple_select_onecol_txn( @@ -62,6 +68,12 @@ class StateStore(SQLBaseStore): keyvalues={"state_group": group}, retcol="event_id", ) + + logger.debug( + "Got %d events for group %s", + len(state_ids), group + ) + state = self._get_events_txn(txn, state_ids) res[group] = state -- cgit 1.4.1 From f4ea78e9e2578f0d0ba4345b3b45390e905438e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 11:24:18 +0000 Subject: More debug logging --- synapse/storage/_base.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a30b0bc410..de08c78ed0 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -443,6 +443,8 @@ class SQLBaseStore(object): if not event_ids: return [] + logger.debug("_get_events_txn called with %d events", len(event_ids)) + if len(event_ids) > 50: events = [] n = 50 @@ -451,6 +453,8 @@ class SQLBaseStore(object): events.extend(self._get_events_txn(txn, e_ids)) return events + logger.debug("_get_events_txn Fetching %d events", len(event_ids)) + where_clause = " OR ".join(["e.event_id = ?" for _ in event_ids]) sql = ( -- cgit 1.4.1 From 3e26720e0574393dc8076b3d4099e16213ce2e6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 11:26:58 +0000 Subject: Temporarily turn off 'redacted_because' and 'prev_content' keys --- synapse/storage/_base.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index de08c78ed0..d636556430 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -499,6 +499,8 @@ class SQLBaseStore(object): ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) + return ev + if check_redacted and redacted: ev = prune_event(ev) -- cgit 1.4.1 From 98933e3db6d43dcb3c8c21d0b65e2647bc3fb303 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 13:03:23 +0000 Subject: Only fetch prev_content when a client is streaming/paginating. Use transactions for event streams. --- synapse/storage/_base.py | 32 +++++++--- synapse/storage/stream.py | 146 ++++++++++++++++++++++------------------------ 2 files changed, 92 insertions(+), 86 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d636556430..9702ab4f43 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -434,12 +434,15 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) - def _get_events(self, event_ids): + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False): return self.runInteraction( - "_get_events", self._get_events_txn, event_ids + "_get_events", self._get_events_txn, event_ids, + check_redacted=check_redacted, get_prev_content=get_prev_content, ) - def _get_events_txn(self, txn, event_ids): + def _get_events_txn(self, txn, event_ids, check_redacted=True, + get_prev_content=False): if not event_ids: return [] @@ -450,7 +453,13 @@ class SQLBaseStore(object): n = 50 split = [event_ids[i:i + n] for i in range(0, len(event_ids), n)] for e_ids in split: - events.extend(self._get_events_txn(txn, e_ids)) + events.extend( + self._get_events_txn( + txn, e_ids, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + ) return events logger.debug("_get_events_txn Fetching %d events", len(event_ids)) @@ -467,10 +476,17 @@ class SQLBaseStore(object): res = txn.fetchall() - return [self._get_event_from_row_txn(txn, *r) for r in res] + return [ + self._get_event_from_row_txn( + txn, r[0], r[1], r[2], + check_redacted=check_redacted, + get_prev_content=get_prev_content, + ) + for r in res + ] def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=True): + get_prev_content=False): sql = ( "SELECT internal_metadata, json, r.event_id FROM event_json as e " "LEFT JOIN redactions as r ON e.event_id = r.redacts " @@ -493,14 +509,12 @@ class SQLBaseStore(object): ) def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=True): + check_redacted=True, get_prev_content=False): d = json.loads(js) internal_metadata = json.loads(internal_metadata) ev = FrozenEvent(d, internal_metadata_dict=internal_metadata) - return ev - if check_redacted and redacted: ev = prune_event(ev) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3405cb365e..c51f489451 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -137,12 +137,12 @@ class StreamStore(SQLBaseStore): with_feedback=with_feedback, ) - @defer.inlineCallbacks @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): # TODO (erikj): Handle compressed feedback + current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " @@ -157,11 +157,6 @@ class StreamStore(SQLBaseStore): "WHERE m.user_id = ? " ) - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - if limit: limit = max(limit, MAX_STREAM_SIZE) else: @@ -172,38 +167,42 @@ class StreamStore(SQLBaseStore): to_id = _parse_stream_token(to_key) if from_key == to_key: - defer.returnValue(([], to_key)) - return + return defer.succeed(([], to_key)) sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events AS e WHERE " + "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE " "(e.outlier = 0 AND (room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " "AND e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { - "redacted": del_sql, "current": current_room_membership_sql, "invites": membership_sql, "limit": limit } - rows = yield self._execute_and_decode( - sql, - user_id, user_id, from_id, to_id - ) + def f(txn): + txn.execute(sql, (user_id, user_id, from_id, to_id,)) - ret = yield self._parse_events(rows) + rows = self.cursor_to_dict(txn) - if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) - else: - # Assume we didn't get anything because there was nothing to get. - key = to_key + ret = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) + + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key - defer.returnValue((ret, key)) + return ret, key + + return self.runInteraction("get_room_events_stream", f) - @defer.inlineCallbacks @log_function def paginate_room_events(self, room_id, from_key, to_key=None, direction='b', limit=-1, @@ -221,7 +220,9 @@ class StreamStore(SQLBaseStore): bounds = _get_token_bound(from_key, from_comp) if to_key: - bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp)) + bounds = "%s AND %s" % ( + bounds, _get_token_bound(to_key, to_comp) + ) if int(limit) > 0: args.append(int(limit)) @@ -229,87 +230,78 @@ class StreamStore(SQLBaseStore): else: limit_str = "" - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = events.event_id " - "LIMIT 1" - ) - sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events" + "SELECT * FROM events" " WHERE outlier = 0 AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" ) % { - "redacted": del_sql, "bounds": bounds, "order": order, "limit": limit_str } - rows = yield self._execute_and_decode( - sql, - *args - ) - - if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] - if direction == 'b': - topo -= 1 - toke -= 1 - next_token = "t%s-%s" % (topo, toke) - else: - # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key + def f(txn): + txn.execute(sql, args) + + rows = self.cursor_to_dict(txn) + + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + if direction == 'b': + topo -= 1 + toke -= 1 + next_token = "t%s-%s" % (topo, toke) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key + + events = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) - events = yield self._parse_events(rows) + return events, next_token, - defer.returnValue( - ( - events, - next_token - ) - ) + return self.runInteraction("paginate_room_events", f) - @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, end_token, with_feedback=False): # TODO (erikj): Handle compressed feedback - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = events.event_id " - "LIMIT 1" - ) - sql = ( - "SELECT *, (%(redacted)s) AS redacted FROM events " + "SELECT * FROM events " "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " - ) % { - "redacted": del_sql, - } - - rows = yield self._execute_and_decode( - sql, - room_id, end_token, limit ) - rows.reverse() # As we selected with reverse ordering + def f(txn): + txn.execute(sql, (room_id, end_token, limit,)) - if rows: - topo = rows[0]["topological_ordering"] - toke = rows[0]["stream_ordering"] - start_token = "t%s-%s" % (topo, toke) + rows = self.cursor_to_dict(txn) - token = (start_token, end_token) - else: - token = (end_token, end_token) + rows.reverse() # As we selected with reverse ordering - events = yield self._parse_events(rows) + if rows: + topo = rows[0]["topological_ordering"] + toke = rows[0]["stream_ordering"] + start_token = "t%s-%s" % (topo, toke) + + token = (start_token, end_token) + else: + token = (end_token, end_token) + + events = self._get_events_txn( + txn, + [r["event_id"] for r in rows], + get_prev_content=True + ) - ret = (events, token) + return events, token - defer.returnValue(ret) + return self.runInteraction("get_recent_events_for_room", f) def get_room_events_max_id(self): return self.runInteraction( -- cgit 1.4.1 From 52d85190081044b9fbaf24869d652d3fe3c23e5d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 13:10:27 +0000 Subject: Don't do batching when getting events. --- synapse/storage/_base.py | 39 ++++----------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9702ab4f43..9687222e72 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -446,43 +446,12 @@ class SQLBaseStore(object): if not event_ids: return [] - logger.debug("_get_events_txn called with %d events", len(event_ids)) - - if len(event_ids) > 50: - events = [] - n = 50 - split = [event_ids[i:i + n] for i in range(0, len(event_ids), n)] - for e_ids in split: - events.extend( - self._get_events_txn( - txn, e_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - ) - ) - return events - - logger.debug("_get_events_txn Fetching %d events", len(event_ids)) - - where_clause = " OR ".join(["e.event_id = ?" for _ in event_ids]) - - sql = ( - "SELECT internal_metadata, json, r.event_id FROM event_json as e " - "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "WHERE %s" - ) % (where_clause,) - - txn.execute(sql, event_ids) - - res = txn.fetchall() - return [ - self._get_event_from_row_txn( - txn, r[0], r[1], r[2], - check_redacted=check_redacted, - get_prev_content=get_prev_content, + self._get_event_txn( + txn, event_id, + check_redacted=check_redacted, get_prev_content=get_prev_content ) - for r in res + for event_id in event_ids ] def _get_event_txn(self, txn, event_id, check_redacted=True, -- cgit 1.4.1 From 12819d5082ac73adc309428770c9270ba378c6e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 13:12:30 +0000 Subject: Remove debug lines --- synapse/storage/state.py | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9aeb0b4063..fd6f1e3b00 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -58,8 +58,6 @@ class StateStore(SQLBaseStore): if group: groups.add(group) - logger.debug("Got groups: %s", groups) - res = {} for group in groups: state_ids = self._simple_select_onecol_txn( @@ -69,11 +67,6 @@ class StateStore(SQLBaseStore): retcol="event_id", ) - logger.debug( - "Got %d events for group %s", - len(state_ids), group - ) - state = self._get_events_txn(txn, state_ids) res[group] = state -- cgit 1.4.1 From af1c7c7808d297711e4b76b862c41a5ec2ca3a9a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 13:13:17 +0000 Subject: PEP8 --- synapse/storage/_base.py | 6 ++++-- synapse/storage/stream.py | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9687222e72..e799ac6c5d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -449,7 +449,8 @@ class SQLBaseStore(object): return [ self._get_event_txn( txn, event_id, - check_redacted=check_redacted, get_prev_content=get_prev_content + check_redacted=check_redacted, + get_prev_content=get_prev_content ) for event_id in event_ids ] @@ -473,7 +474,8 @@ class SQLBaseStore(object): internal_metadata, js, redacted = res return self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, check_redacted=check_redacted, + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, get_prev_content=get_prev_content, ) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index c51f489451..bd3a411ead 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -142,7 +142,6 @@ class StreamStore(SQLBaseStore): limit=0, with_feedback=False): # TODO (erikj): Handle compressed feedback - current_room_membership_sql = ( "SELECT m.room_id FROM room_memberships as m " "INNER JOIN current_state_events as c ON m.event_id = c.event_id " -- cgit 1.4.1 From adb04b1e572d13b75541f4684aac3683e94d70b8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Jan 2015 13:21:39 +0000 Subject: Update copyright notices --- scripts/copyrighter.pl | 2 +- synapse/__init__.py | 2 +- synapse/api/__init__.py | 2 +- synapse/api/auth.py | 2 +- synapse/api/constants.py | 2 +- synapse/api/errors.py | 2 +- synapse/api/ratelimiting.py | 2 +- synapse/api/urls.py | 2 +- synapse/app/__init__.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/synctl.py | 2 +- synapse/config/__init__.py | 2 +- synapse/config/_base.py | 2 +- synapse/config/captcha.py | 2 +- synapse/config/database.py | 2 +- synapse/config/email.py | 2 +- synapse/config/homeserver.py | 2 +- synapse/config/logger.py | 2 +- synapse/config/ratelimiting.py | 2 +- synapse/config/repository.py | 2 +- synapse/config/server.py | 2 +- synapse/config/tls.py | 2 +- synapse/config/voip.py | 2 +- synapse/crypto/__init__.py | 2 +- synapse/crypto/context_factory.py | 2 +- synapse/crypto/event_signing.py | 2 +- synapse/crypto/keyclient.py | 2 +- synapse/crypto/keyring.py | 2 +- synapse/events/__init__.py | 2 +- synapse/events/builder.py | 2 +- synapse/events/snapshot.py | 2 +- synapse/events/utils.py | 2 +- synapse/events/validator.py | 2 +- synapse/federation/__init__.py | 2 +- synapse/federation/persistence.py | 2 +- synapse/federation/replication.py | 2 +- synapse/federation/transport.py | 2 +- synapse/federation/units.py | 2 +- synapse/handlers/__init__.py | 2 +- synapse/handlers/_base.py | 2 +- synapse/handlers/admin.py | 2 +- synapse/handlers/directory.py | 2 +- synapse/handlers/events.py | 2 +- synapse/handlers/federation.py | 2 +- synapse/handlers/login.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/profile.py | 2 +- synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/typing.py | 2 +- synapse/http/__init__.py | 2 +- synapse/http/agent_name.py | 2 +- synapse/http/client.py | 2 +- synapse/http/endpoint.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/http/server.py | 4 ++-- synapse/http/server_key_resource.py | 2 +- synapse/media/v0/content_repository.py | 2 +- synapse/media/v1/__init__.py | 16 +++++++++++++++- synapse/media/v1/base_resource.py | 2 +- synapse/media/v1/download_resource.py | 2 +- synapse/media/v1/filepath.py | 2 +- synapse/media/v1/media_repository.py | 2 +- synapse/media/v1/thumbnail_resource.py | 2 +- synapse/media/v1/thumbnailer.py | 2 +- synapse/media/v1/upload_resource.py | 2 +- synapse/notifier.py | 2 +- synapse/rest/__init__.py | 2 +- synapse/rest/admin.py | 2 +- synapse/rest/base.py | 2 +- synapse/rest/directory.py | 2 +- synapse/rest/events.py | 2 +- synapse/rest/initial_sync.py | 2 +- synapse/rest/login.py | 2 +- synapse/rest/presence.py | 2 +- synapse/rest/profile.py | 2 +- synapse/rest/register.py | 2 +- synapse/rest/room.py | 2 +- synapse/rest/transactions.py | 2 +- synapse/rest/voip.py | 2 +- synapse/server.py | 2 +- synapse/state.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/directory.py | 2 +- synapse/storage/event_federation.py | 2 +- synapse/storage/feedback.py | 2 +- synapse/storage/keys.py | 2 +- synapse/storage/media_repository.py | 2 +- synapse/storage/presence.py | 2 +- synapse/storage/profile.py | 2 +- synapse/storage/registration.py | 2 +- synapse/storage/room.py | 2 +- synapse/storage/roommember.py | 2 +- synapse/storage/schema/delta/v2.sql | 2 +- synapse/storage/schema/delta/v3.sql | 2 +- synapse/storage/schema/delta/v4.sql | 14 ++++++++++++++ synapse/storage/schema/delta/v5.sql | 14 ++++++++++++++ synapse/storage/schema/delta/v6.sql | 2 +- synapse/storage/schema/delta/v8.sql | 2 +- synapse/storage/schema/delta/v9.sql | 2 +- synapse/storage/schema/event_edges.sql | 14 ++++++++++++++ synapse/storage/schema/event_signatures.sql | 2 +- synapse/storage/schema/im.sql | 2 +- synapse/storage/schema/keys.sql | 2 +- synapse/storage/schema/media_repository.sql | 2 +- synapse/storage/schema/presence.sql | 2 +- synapse/storage/schema/profiles.sql | 2 +- synapse/storage/schema/redactions.sql | 14 ++++++++++++++ synapse/storage/schema/room_aliases.sql | 2 +- synapse/storage/schema/state.sql | 2 +- synapse/storage/schema/transactions.sql | 2 +- synapse/storage/schema/users.sql | 2 +- synapse/storage/signatures.py | 2 +- synapse/storage/state.py | 2 +- synapse/storage/stream.py | 2 +- synapse/storage/transactions.py | 2 +- synapse/streams/__init__.py | 2 +- synapse/streams/config.py | 2 +- synapse/streams/events.py | 2 +- synapse/types.py | 2 +- synapse/util/__init__.py | 2 +- synapse/util/async.py | 2 +- synapse/util/distributor.py | 2 +- synapse/util/emailutils.py | 2 +- synapse/util/frozenutils.py | 2 +- synapse/util/jsonobject.py | 2 +- synapse/util/lockutils.py | 2 +- synapse/util/logcontext.py | 14 ++++++++++++++ synapse/util/logutils.py | 2 +- synapse/util/stringutils.py | 2 +- 132 files changed, 212 insertions(+), 128 deletions(-) (limited to 'synapse/storage') diff --git a/scripts/copyrighter.pl b/scripts/copyrighter.pl index 7c03ef21fc..a913d74c8d 100755 --- a/scripts/copyrighter.pl +++ b/scripts/copyrighter.pl @@ -14,7 +14,7 @@ # limitations under the License. $copyright = < Date: Tue, 6 Jan 2015 14:37:00 +0000 Subject: We don't need the full events for get_rooms_for_user_where_membership_is --- synapse/handlers/profile.py | 10 +++++----- synapse/storage/roommember.py | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 36 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 3f11e2dcf4..8d4d44150d 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.api.constants import Membership +from synapse.api.constants import EventTypes, Membership from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler @@ -203,7 +203,7 @@ class ProfileHandler(BaseHandler): for j in joins: content = { - "membership": j.content["membership"], + "membership": Membership.JOIN, } yield self.distributor.fire( @@ -212,9 +212,9 @@ class ProfileHandler(BaseHandler): msg_handler = self.hs.get_handlers().message_handler yield msg_handler.create_and_send_event({ - "type": j.type, + "type": EventTypes.Member, "room_id": j.room_id, - "state_key": j.state_key, + "state_key": user.to_string(), "content": content, - "sender": j.state_key + "sender": user.to_string() }, ratelimit=False) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4921561fc3..c495fab919 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from collections import namedtuple + from ._base import SQLBaseStore from synapse.api.constants import Membership @@ -24,6 +26,12 @@ import logging logger = logging.getLogger(__name__) +RoomsForUser = namedtuple( + "RoomsForUser", + ("room_id", "sender", "membership") +) + + class RoomMemberStore(SQLBaseStore): def _store_room_member_txn(self, txn, event): @@ -163,19 +171,37 @@ class RoomMemberStore(SQLBaseStore): membership_list (list): A list of synapse.api.constants.Membership values which the user must be in. Returns: - A list of RoomMemberEvent objects + A list of dictionary objects, with room_id, membership and sender + defined. """ if not membership_list: return defer.succeed(None) - args = [user_id] - args.extend(membership_list) - where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) - return self._get_members_query(where_clause, args) + args = [user_id] + args.extend(membership_list) + + def f(txn): + sql = ( + "SELECT m.room_id, m.sender, m.membership" + " FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %s" + ) % (where_clause,) + + txn.execute(sql, args) + return [ + RoomsForUser(**r) for r in self.cursor_to_dict(txn) + ] + + return self.runInteraction( + "get_rooms_for_user_where_membership_is", + f + ) def get_joined_hosts_for_room(self, room_id): return self._simple_select_onecol( -- cgit 1.4.1 From 96707ed7185fc5c1668c8a31b62c67bdf39ed777 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 14:44:27 +0000 Subject: Name 'user_rooms_intersect' transaction --- synapse/storage/roommember.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index c495fab919..11f8f78773 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -239,26 +239,28 @@ class RoomMemberStore(SQLBaseStore): results = self._parse_events_txn(txn, rows) return results - @defer.inlineCallbacks def user_rooms_intersect(self, user_id_list): """ Checks whether all the users whose IDs are given in a list share a room. """ - user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list)) - sql = ( - "SELECT m.room_id FROM room_memberships as m " - "INNER JOIN current_state_events as c " - "ON m.event_id = c.event_id " - "WHERE m.membership = 'join' " - "AND (%(clause)s) " - # TODO(paul): We've got duplicate rows in the database somewhere - # so we have to DISTINCT m.user_id here - "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?" - ) % {"clause": user_list_clause} + def interaction(txn): + user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list)) + sql = ( + "SELECT m.room_id FROM room_memberships as m " + "INNER JOIN current_state_events as c " + "ON m.event_id = c.event_id " + "WHERE m.membership = 'join' " + "AND (%(clause)s) " + # TODO(paul): We've got duplicate rows in the database somewhere + # so we have to DISTINCT m.user_id here + "GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?" + ) % {"clause": user_list_clause} + + args = list(user_id_list) + args.append(len(user_id_list)) - args = list(user_id_list) - args.append(len(user_id_list)) + txn.execute(sql, args) - rows = yield self._execute(None, sql, *args) + return len(txn.fetchall()) > 0 - defer.returnValue(len(rows) > 0) + return self.runInteraction("user_rooms_intersect", interaction) -- cgit 1.4.1 From 52b2c6c9c73d47a269756d8da57b4dcff54e0d21 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 14:56:57 +0000 Subject: Don't include None's in _get_events_txn --- synapse/storage/_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 728d1df8fa..a0b7f943fd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -446,7 +446,7 @@ class SQLBaseStore(object): if not event_ids: return [] - return [ + events = [ self._get_event_txn( txn, event_id, check_redacted=check_redacted, @@ -455,6 +455,8 @@ class SQLBaseStore(object): for event_id in event_ids ] + return [e for e in events if e] + def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False): sql = ( -- cgit 1.4.1 From 03a501456ca8815a7d6fd8ea84d9c2a1feba33cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 15:22:28 +0000 Subject: Time how long calls to _get_destination_retry_timings take --- synapse/storage/transactions.py | 3 ++ synapse/util/logutils.py | 73 ++++++++++++++++++++++++++++++++++------- 2 files changed, 64 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index e06ef35690..36ddf30d65 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -19,6 +19,8 @@ from collections import namedtuple from twisted.internet import defer +from synapse.util.logutils import time_function + import logging logger = logging.getLogger(__name__) @@ -228,6 +230,7 @@ class TransactionStore(SQLBaseStore): "get_destination_retry_timings", self._get_destination_retry_timings, destination) + @time_function def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 18ba405c47..c4dfb69c51 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -19,14 +19,37 @@ from functools import wraps import logging import inspect +import time + + +_TIME_FUNC_ID = 0 + + +def _log_debug_as_f(f, msg, msg_args): + name = f.__module__ + logger = logging.getLogger(name) + + if logger.isEnabledFor(logging.DEBUG): + lineno = f.func_code.co_firstlineno + pathname = f.func_code.co_filename + + record = logging.LogRecord( + name=name, + level=logging.DEBUG, + pathname=pathname, + lineno=lineno, + msg=msg, + args=msg_args, + exc_info=None + ) + + logger.handle(record) def log_function(f): """ Function decorator that logs every call to that function. """ func_name = f.__name__ - lineno = f.func_code.co_firstlineno - pathname = f.func_code.co_filename @wraps(f) def wrapped(*args, **kwargs): @@ -52,24 +75,50 @@ def log_function(f): "args": ", ".join(func_args) } - record = logging.LogRecord( - name=name, - level=level, - pathname=pathname, - lineno=lineno, - msg="Invoked '%(func_name)s' with args: %(args)s", - args=msg_args, - exc_info=None + _log_debug_as_f( + f, + "Invoked '%(func_name)s' with args: %(args)s", + msg_args ) - logger.handle(record) - return f(*args, **kwargs) wrapped.__name__ = func_name return wrapped +def time_function(f): + func_name = f.__name__ + + @wraps(f) + def wrapped(*args, **kwargs): + global _TIME_FUNC_ID + id = _TIME_FUNC_ID + _TIME_FUNC_ID += 1 + + start = time.clock() * 1000 + + try: + _log_debug_as_f( + f, + "[FUNC START] {%s-%d}", + (func_name, _TIME_FUNC_ID), + ) + + r = f(*args, **kwargs) + finally: + end = time.clock() * 1000 + _log_debug_as_f( + f, + "[FUNC END] {%s-%d} %f", + (func_name, _TIME_FUNC_ID, end-start,), + ) + + return r + + return wrapped + + def trace_function(f): func_name = f.__name__ linenum = f.func_code.co_firstlineno -- cgit 1.4.1 From 9bd07bed238337151ae79dc948f49cdf7141578c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 15:28:56 +0000 Subject: Actually time that function --- synapse/storage/transactions.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 36ddf30d65..9d14f89303 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -200,6 +200,7 @@ class TransactionStore(SQLBaseStore): self._get_transactions_after, transaction_id, destination ) + @time_function def _get_transactions_after(cls, txn, transaction_id, destination): where = ( "destination = ? AND id > (select id FROM %s WHERE " -- cgit 1.4.1 From f6da237c353d598946a6c81260653203602800c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 15:40:38 +0000 Subject: Add index on transaction_id to sent_transcations --- synapse/storage/schema/transactions.sql | 1 + synapse/storage/transactions.py | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql index 86f530d82a..2d30f99b06 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/transactions.sql @@ -42,6 +42,7 @@ CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destinatio CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions( destination ); +CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); -- So that we can do an efficient look up of all transactions that have yet to be successfully -- sent. CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 9d14f89303..e06ef35690 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -19,8 +19,6 @@ from collections import namedtuple from twisted.internet import defer -from synapse.util.logutils import time_function - import logging logger = logging.getLogger(__name__) @@ -200,7 +198,6 @@ class TransactionStore(SQLBaseStore): self._get_transactions_after, transaction_id, destination ) - @time_function def _get_transactions_after(cls, txn, transaction_id, destination): where = ( "destination = ? AND id > (select id FROM %s WHERE " @@ -231,7 +228,6 @@ class TransactionStore(SQLBaseStore): "get_destination_retry_timings", self._get_destination_retry_timings, destination) - @time_function def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) -- cgit 1.4.1 From a01416cf217db5e442ff90715cbe1955b67b1efb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 15:42:18 +0000 Subject: Add delta and bump DB version --- synapse/storage/__init__.py | 2 +- synapse/storage/schema/delta/v11.sql | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/v11.sql (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 8d482decee..4beb951b9f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -66,7 +66,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 10 +SCHEMA_VERSION = 11 class _RollbackButIsFineException(Exception): diff --git a/synapse/storage/schema/delta/v11.sql b/synapse/storage/schema/delta/v11.sql new file mode 100644 index 0000000000..313592221b --- /dev/null +++ b/synapse/storage/schema/delta/v11.sql @@ -0,0 +1,16 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id); \ No newline at end of file -- cgit 1.4.1 From fd9a8db7ea93225af1774a604a97e84315cdccc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 15:59:31 +0000 Subject: Only fetch the columns we need. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index a5e1c38f75..bedc3c6c52 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -271,7 +271,7 @@ class StreamStore(SQLBaseStore): # TODO (erikj): Handle compressed feedback sql = ( - "SELECT * FROM events " + "SELECT stream_ordering, topological_ordering, event_id FROM events " "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " ) -- cgit 1.4.1 From 36a2a877e2920a20a679af96cbb1c4d041b89f96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jan 2015 16:34:26 +0000 Subject: Use time.time() instead of time.clock() --- synapse/storage/_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a0b7f943fd..f660fc6eaf 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -64,7 +64,7 @@ class LoggingTransaction(object): # Don't let logging failures stop SQL from working pass - start = time.clock() * 1000 + start = time.time() * 1000 try: return self.txn.execute( sql, *args, **kwargs @@ -73,7 +73,7 @@ class LoggingTransaction(object): logger.exception("[SQL FAIL] {%s}", self.name) raise finally: - end = time.clock() * 1000 + end = time.time() * 1000 sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) @@ -93,7 +93,7 @@ class SQLBaseStore(object): def inner_func(txn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) - start = time.clock() * 1000 + start = time.time() * 1000 txn_id = SQLBaseStore._TXN_ID # We don't really need these to be unique, so lets stop it from @@ -109,7 +109,7 @@ class SQLBaseStore(object): logger.exception("[TXN FAIL] {%s}", name) raise finally: - end = time.clock() * 1000 + end = time.time() * 1000 transaction_logger.debug( "[TXN END] {%s} %f", name, end - start -- cgit 1.4.1