From 0b78d8adf26d5890c4b50510bdfc04e08804ace2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 15:20:56 +0100 Subject: Fix _delete_old_forward_extrem_cache query --- synapse/storage/event_federation.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3d62451de9..7a02bf5e63 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -398,12 +398,11 @@ class EventFederationStore(SQLBaseStore): sql = (""" DELETE FROM stream_ordering_to_exterm WHERE - ( - SELECT max(stream_ordering) AS stream_ordering + room_id IN ( + SELECT room_id AS stream_ordering FROM stream_ordering_to_exterm - WHERE room_id = stream_ordering_to_exterm.room_id - ) > ? - AND stream_ordering < ? + WHERE stream_ordering > ? + ) AND stream_ordering < ? """) txn.execute( sql, -- cgit 1.5.1 From 4f78108d8cda9a8d0394caf0c33ec27cae9ee2bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 15:24:22 +0100 Subject: Readd entries to public_room_list_stream that were deleted --- synapse/storage/prepare_database.py | 2 +- .../storage/schema/delta/36/readd_public_rooms.sql | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/36/readd_public_rooms.sql (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 7efbe51cda..08de3cc4c1 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 35 +SCHEMA_VERSION = 36 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/36/readd_public_rooms.sql b/synapse/storage/schema/delta/36/readd_public_rooms.sql new file mode 100644 index 0000000000..0de7e326d2 --- /dev/null +++ b/synapse/storage/schema/delta/36/readd_public_rooms.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +INSERT INTO public_room_list_stream (stream_id, room_id, visibility) + SELECT 1, room_id, is_public FROM rooms AS r + WHERE is_public = CAST(1 AS BOOLEAN) + AND NOT EXISTS ( + SELECT room_id FROM stream_ordering_to_exterm WHERE room_id = r.room_id + ); -- cgit 1.5.1 From dc78db8c5641513ff01276ca50070a8d46ebce36 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 15:52:44 +0100 Subject: Update correct table --- synapse/storage/schema/delta/36/readd_public_rooms.sql | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/36/readd_public_rooms.sql b/synapse/storage/schema/delta/36/readd_public_rooms.sql index 0de7e326d2..5460a350b6 100644 --- a/synapse/storage/schema/delta/36/readd_public_rooms.sql +++ b/synapse/storage/schema/delta/36/readd_public_rooms.sql @@ -13,10 +13,14 @@ * limitations under the License. */ - -INSERT INTO public_room_list_stream (stream_id, room_id, visibility) - SELECT 1, room_id, is_public FROM rooms AS r - WHERE is_public = CAST(1 AS BOOLEAN) - AND NOT EXISTS ( - SELECT room_id FROM stream_ordering_to_exterm WHERE room_id = r.room_id +-- Re-add some entries to stream_ordering_to_exterm that were incorrectly deleted +INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id) + SELECT + (SELECT max(stream_ordering) FROM events where room_id = e.room_id) AS stream_ordering, + room_id, + event_id + FROM event_forward_extremities AS e + WHERE NOT EXISTS ( + SELECT room_id FROM stream_ordering_to_exterm AS s + WHERE s.room_id = e.room_id ); -- cgit 1.5.1 From dc692556d6b3e65eec133f5bfa5dbbcd7c3c1350 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 16:28:47 +0100 Subject: Remove spurious AS clause --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 7a02bf5e63..53feaa1960 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -399,7 +399,7 @@ class EventFederationStore(SQLBaseStore): DELETE FROM stream_ordering_to_exterm WHERE room_id IN ( - SELECT room_id AS stream_ordering + SELECT room_id FROM stream_ordering_to_exterm WHERE stream_ordering > ? ) AND stream_ordering < ? -- cgit 1.5.1 From 8009d843647a8c693340c7b9ec341066fb6db3b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Sep 2016 16:46:59 +0100 Subject: Match against event_id, rather than room_id --- synapse/storage/schema/delta/36/readd_public_rooms.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/36/readd_public_rooms.sql b/synapse/storage/schema/delta/36/readd_public_rooms.sql index 5460a350b6..90d8fd18f9 100644 --- a/synapse/storage/schema/delta/36/readd_public_rooms.sql +++ b/synapse/storage/schema/delta/36/readd_public_rooms.sql @@ -16,7 +16,7 @@ -- Re-add some entries to stream_ordering_to_exterm that were incorrectly deleted INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id) SELECT - (SELECT max(stream_ordering) FROM events where room_id = e.room_id) AS stream_ordering, + (SELECT stream_ordering FROM events where event_id = e.event_id) AS stream_ordering, room_id, event_id FROM event_forward_extremities AS e -- cgit 1.5.1 From cf3e1cc20022273244530d2a9739bdc5392a0941 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Sep 2016 17:16:24 +0100 Subject: Fix perf of fetching state in SQLite --- synapse/storage/state.py | 61 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7eb342674c..a82ba1d1d9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -307,6 +307,9 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups_txn(self, txn, groups, types=None): results = {group: {} for group in groups} + if types is not None: + types = list(set(types)) # deduplicate types list + if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is # a temporary hack until we can add the right indices in @@ -379,6 +382,44 @@ class StateStore(SQLBaseStore): next_group = group while next_group: + # We did this before by getting the list of group ids, and + # then passing that list to sqlite to get latest event for + # each (type, state_key). However, that was terribly slow + # without the right indicies (which we can't add until + # after we finish deduping state, which requires this func) + if types is not None: + args = [next_group] + [i for typ in types for i in typ] + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? %s" % (where_clause,), + args + ) + rows = txn.fetchall() + + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + + # If the lengths match then we must have all the types, + # so no need to go walk further down the tree. + if len(results[group]) == len(types): + break + else: + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ?", + (next_group,) + ) + rows = txn.fetchall() + + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + next_group = self._simple_select_one_onecol_txn( txn, table="state_group_edges", @@ -389,26 +430,6 @@ class StateStore(SQLBaseStore): if next_group: group_tree.append(next_group) - sql = (""" - SELECT type, state_key, event_id FROM state_groups_state - INNER JOIN ( - SELECT type, state_key, max(state_group) as state_group - FROM state_groups_state - WHERE state_group IN (%s) %s - GROUP BY type, state_key - ) USING (type, state_key, state_group); - """) % (",".join("?" for _ in group_tree), where_clause,) - - args = list(group_tree) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] - return results @defer.inlineCallbacks -- cgit 1.5.1 From 13122e5e246d7dd088ddb8d345487ddb23a66f6b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Sep 2016 09:21:51 +0100 Subject: Remove unused variable --- synapse/storage/state.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a82ba1d1d9..45478c7a5a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -378,7 +378,6 @@ class StateStore(SQLBaseStore): # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: - group_tree = [group] next_group = group while next_group: @@ -427,8 +426,6 @@ class StateStore(SQLBaseStore): retcol="prev_state_group", allow_none=True, ) - if next_group: - group_tree.append(next_group) return results -- cgit 1.5.1 From 4974147aa36eca7e5a247c3fca6e259bda1d51ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Sep 2016 09:27:54 +0100 Subject: Remove duplication --- synapse/storage/state.py | 50 ++++++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 31 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 45478c7a5a..49abf0ac74 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -386,38 +386,26 @@ class StateStore(SQLBaseStore): # each (type, state_key). However, that was terribly slow # without the right indicies (which we can't add until # after we finish deduping state, which requires this func) - if types is not None: - args = [next_group] + [i for typ in types for i in typ] - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ? %s" % (where_clause,), - args - ) - rows = txn.fetchall() - - results[group].update({ - (typ, state_key): event_id - for typ, state_key, event_id in rows - if (typ, state_key) not in results[group] - }) - - # If the lengths match then we must have all the types, - # so no need to go walk further down the tree. - if len(results[group]) == len(types): - break - else: - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ?", - (next_group,) - ) - rows = txn.fetchall() + args = [next_group] + if types: + args.extend(i for typ in types for i in typ) - results[group].update({ - (typ, state_key): event_id - for typ, state_key, event_id in rows - if (typ, state_key) not in results[group] - }) + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? %s" % (where_clause,), + args + ) + rows = txn.fetchall() + results[group].update({ + (typ, state_key): event_id + for typ, state_key, event_id in rows + if (typ, state_key) not in results[group] + }) + + # If the lengths match then we must have all the types, + # so no need to go walk further down the tree. + if types is not None and len(results[group]) == len(types): + break next_group = self._simple_select_one_onecol_txn( txn, -- cgit 1.5.1 From 9040c9ffa1b9de46dba95edbce66759ee5c1e6c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Sep 2016 09:43:27 +0100 Subject: Fix background reindex of origin_server_ts The storage function `_get_events_txn` was removed everywhere except from this background reindex. The function was removed due to it being (almost) completely unused while also being large and complex. Therefore, instead of resurrecting `_get_events_txn` we manually reimplement the bits that are needed directly. --- synapse/storage/events.py | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6dc46fa50f..6cf9d1176d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1355,39 +1355,53 @@ class EventsStore(SQLBaseStore): min_stream_id = rows[-1][0] event_ids = [row[1] for row in rows] - events = self._get_events_txn(txn, event_ids) + rows_to_update = [] - rows = [] - for event in events: - try: - event_id = event.event_id - origin_server_ts = event.origin_server_ts - except (KeyError, AttributeError): - # If the event is missing a necessary field then - # skip over it. - continue + chunks = [ + event_ids[i:i + 100] + for i in xrange(0, len(event_ids), 100) + ] + for chunk in chunks: + ev_rows = self._simple_select_many_txn( + txn, + table="event_json", + column="event_id", + iterable=chunk, + retcols=["event_id", "json"], + keyvalues={}, + ) - rows.append((origin_server_ts, event_id)) + for row in ev_rows: + event_id = row["event_id"] + event_json = json.loads(row["json"]) + try: + origin_server_ts = event_json["origin_server_ts"] + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + rows_to_update.append((origin_server_ts, event_id)) sql = ( "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" ) - for index in range(0, len(rows), INSERT_CLUMP_SIZE): - clump = rows[index:index + INSERT_CLUMP_SIZE] + for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): + clump = rows_to_update[index:index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(rows) + "rows_inserted": rows_inserted + len(rows_to_update) } self._background_update_progress_txn( txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress ) - return len(rows) + return len(rows_to_update) result = yield self.runInteraction( self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn -- cgit 1.5.1