From d5ae1f129143d6436a238fd7882e39168f944846 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 10:03:48 +0100 Subject: Ensure we don't mutate state cache entries --- synapse/storage/state.py | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index fdbdade536..ec8c62b653 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -817,27 +817,32 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def _background_index_state(self, progress, batch_size): - def reindex_txn(txn): - if isinstance(self.database_engine, PostgresEngine): - txn.execute( - "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) - else: - txn.execute( - "CREATE INDEX state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) + def reindex_txn(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + try: + txn = conn.cursor() + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + else: + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) + finally: + conn.set_session(autocommit=False) - yield self.runInteraction( - self.STATE_GROUP_INDEX_UPDATE_NAME, reindex_txn - ) + yield self.runWithConnection(reindex_txn) yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) -- cgit 1.4.1 From 00f51493f5726210bf649889ed3c03b56fddbe1d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 14 Sep 2016 10:18:30 +0100 Subject: Fix reindex --- synapse/storage/state.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ec8c62b653..7eb342674c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -819,11 +819,11 @@ class StateStore(SQLBaseStore): def _background_index_state(self, progress, batch_size): def reindex_txn(conn): conn.rollback() - # postgres insists on autocommit for the index - conn.set_session(autocommit=True) - try: - txn = conn.cursor() - if isinstance(self.database_engine, PostgresEngine): + if isinstance(self.database_engine, PostgresEngine): + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + try: + txn = conn.cursor() txn.execute( "CREATE INDEX CONCURRENTLY state_groups_state_type_idx" " ON state_groups_state(state_group, type, state_key)" @@ -831,16 +831,17 @@ class StateStore(SQLBaseStore): txn.execute( "DROP INDEX IF EXISTS state_groups_state_id" ) - else: - txn.execute( - "CREATE INDEX state_groups_state_type_idx" - " ON state_groups_state(state_group, type, state_key)" - ) - txn.execute( - "DROP INDEX IF EXISTS state_groups_state_id" - ) - finally: - conn.set_session(autocommit=False) + finally: + conn.set_session(autocommit=False) + else: + txn = conn.cursor() + txn.execute( + "CREATE INDEX state_groups_state_type_idx" + " ON state_groups_state(state_group, type, state_key)" + ) + txn.execute( + "DROP INDEX IF EXISTS state_groups_state_id" + ) yield self.runWithConnection(reindex_txn) -- cgit 1.4.1 From cf3e1cc20022273244530d2a9739bdc5392a0941 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Sep 2016 17:16:24 +0100 Subject: Fix perf of fetching state in SQLite --- synapse/storage/state.py | 61 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 20 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7eb342674c..a82ba1d1d9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -307,6 +307,9 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups_txn(self, txn, groups, types=None): results = {group: {} for group in groups} + if types is not None: + types = list(set(types)) # deduplicate types list + if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is # a temporary hack until we can add the right indices in @@ -379,6 +382,44 @@ class StateStore(SQLBaseStore): next_group = group while next_group: + # We did this before by getting the list of group ids, and + # then passing that list to sqlite to get latest event for + # each (type, state_key). However, that was terribly slow + # without the right indicies (which we can't add until + # after we finish deduping state, which requires this func) + if types is not None: + args = [next_group] + [i for typ in types for i in typ] + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? %s" % (where_clause,), + args + ) + rows = txn.fetchall() + + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + + # If the lengths match then we must have all the types, + # so no need to go walk further down the tree. + if len(results[group]) == len(types): + break + else: + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ?", + (next_group,) + ) + rows = txn.fetchall() + + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + next_group = self._simple_select_one_onecol_txn( txn, table="state_group_edges", @@ -389,26 +430,6 @@ class StateStore(SQLBaseStore): if next_group: group_tree.append(next_group) - sql = (""" - SELECT type, state_key, event_id FROM state_groups_state - INNER JOIN ( - SELECT type, state_key, max(state_group) as state_group - FROM state_groups_state - WHERE state_group IN (%s) %s - GROUP BY type, state_key - ) USING (type, state_key, state_group); - """) % (",".join("?" for _ in group_tree), where_clause,) - - args = list(group_tree) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] - return results @defer.inlineCallbacks -- cgit 1.4.1 From 13122e5e246d7dd088ddb8d345487ddb23a66f6b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Sep 2016 09:21:51 +0100 Subject: Remove unused variable --- synapse/storage/state.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a82ba1d1d9..45478c7a5a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -378,7 +378,6 @@ class StateStore(SQLBaseStore): # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: - group_tree = [group] next_group = group while next_group: @@ -427,8 +426,6 @@ class StateStore(SQLBaseStore): retcol="prev_state_group", allow_none=True, ) - if next_group: - group_tree.append(next_group) return results -- cgit 1.4.1 From 4974147aa36eca7e5a247c3fca6e259bda1d51ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Sep 2016 09:27:54 +0100 Subject: Remove duplication --- synapse/storage/state.py | 50 ++++++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 31 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 45478c7a5a..49abf0ac74 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -386,38 +386,26 @@ class StateStore(SQLBaseStore): # each (type, state_key). However, that was terribly slow # without the right indicies (which we can't add until # after we finish deduping state, which requires this func) - if types is not None: - args = [next_group] + [i for typ in types for i in typ] - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ? %s" % (where_clause,), - args - ) - rows = txn.fetchall() - - results[group].update({ - (typ, state_key): event_id - for typ, state_key, event_id in rows - if (typ, state_key) not in results[group] - }) - - # If the lengths match then we must have all the types, - # so no need to go walk further down the tree. - if len(results[group]) == len(types): - break - else: - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ?", - (next_group,) - ) - rows = txn.fetchall() + args = [next_group] + if types: + args.extend(i for typ in types for i in typ) - results[group].update({ - (typ, state_key): event_id - for typ, state_key, event_id in rows - if (typ, state_key) not in results[group] - }) + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? %s" % (where_clause,), + args + ) + rows = txn.fetchall() + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + + # If the lengths match then we must have all the types, + # so no need to go walk further down the tree. + if types is not None and len(results[group]) == len(types): + break next_group = self._simple_select_one_onecol_txn( txn, -- cgit 1.4.1