diff options
author | Erik Johnston <erik@matrix.org> | 2015-11-11 17:19:51 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-11-11 17:19:51 +0000 |
commit | e21cef9bb56493cf05fa82e2d3262df213657cf6 (patch) | |
tree | 54fada4d427cfa90dc1a327ac01cc73c9a0fabad /synapse/storage/schema/delta/25 | |
parent | Merge pull request #358 from matrix-org/daniel/publicwritable (diff) | |
parent | Fix param style to work on both sqlite and postgres (diff) | |
download | synapse-e21cef9bb56493cf05fa82e2d3262df213657cf6.tar.xz |
Merge pull request #359 from matrix-org/markjh/incremental_indexing
Incremental background updates for db indexes
Diffstat (limited to 'synapse/storage/schema/delta/25')
-rw-r--r-- | synapse/storage/schema/delta/25/00background_updates.sql | 21 | ||||
-rw-r--r-- | synapse/storage/schema/delta/25/fts.py | 101 |
2 files changed, 47 insertions, 75 deletions
diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql new file mode 100644 index 0000000000..41a9b59b1b --- /dev/null +++ b/synapse/storage/schema/delta/25/00background_updates.sql @@ -0,0 +1,21 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE IF NOT EXISTS background_updates( + update_name TEXT NOT NULL, -- The name of the background update. + progress_json TEXT NOT NULL, -- The current progress of the update as JSON. + CONSTRAINT background_updates_uniqueness UNIQUE (update_name) +); diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py index b7cd0ce3b8..5239d69073 100644 --- a/synapse/storage/schema/delta/25/fts.py +++ b/synapse/storage/schema/delta/25/fts.py @@ -22,7 +22,7 @@ import ujson logger = logging.getLogger(__name__) -POSTGRES_SQL = """ +POSTGRES_TABLE = """ CREATE TABLE IF NOT EXISTS event_search ( event_id TEXT, room_id TEXT, @@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search ( vector tsvector ); -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.body', - to_tsvector('english', json::json->'content'->>'body') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.message'; - -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.name', - to_tsvector('english', json::json->'content'->>'name') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.name'; - -INSERT INTO event_search SELECT - event_id, room_id, json::json->>'sender', 'content.topic', - to_tsvector('english', json::json->'content'->>'topic') - FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic'; - - CREATE INDEX event_search_fts_idx ON event_search USING gin(vector); CREATE INDEX event_search_ev_idx ON event_search(event_id); CREATE INDEX event_search_ev_ridx ON event_search(room_id); @@ -61,67 +45,34 @@ SQLITE_TABLE = ( def run_upgrade(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): - run_postgres_upgrade(cur) - return + for statement in get_statements(POSTGRES_TABLE.splitlines()): + cur.execute(statement) + elif isinstance(database_engine, Sqlite3Engine): + cur.execute(SQLITE_TABLE) + else: + raise Exception("Unrecognized database engine") - if isinstance(database_engine, Sqlite3Engine): - run_sqlite_upgrade(cur) - return + cur.execute("SELECT MIN(stream_ordering) FROM events") + rows = cur.fetchall() + min_stream_id = rows[0][0] + cur.execute("SELECT MAX(stream_ordering) FROM events") + rows = cur.fetchall() + max_stream_id = rows[0][0] -def run_postgres_upgrade(cur): - for statement in get_statements(POSTGRES_SQL.splitlines()): - cur.execute(statement) + if min_stream_id is not None and max_stream_id is not None: + progress = { + "target_min_stream_id_inclusive": min_stream_id, + "max_stream_id_exclusive": max_stream_id + 1, + "rows_inserted": 0, + } + progress_json = ujson.dumps(progress) + sql = ( + "INSERT into background_updates (update_name, progress_json)" + " VALUES (?, ?)" + ) -def run_sqlite_upgrade(cur): - cur.execute(SQLITE_TABLE) + sql = database_engine.convert_param_style(sql) - rowid = -1 - while True: - cur.execute( - "SELECT rowid, json FROM event_json" - " WHERE rowid > ?" - " ORDER BY rowid ASC LIMIT 100", - (rowid,) - ) - - res = cur.fetchall() - - if not res: - break - - events = [ - ujson.loads(js) - for _, js in res - ] - - rowid = max(rid for rid, _ in res) - - rows = [] - for ev in events: - content = ev.get("content", {}) - body = content.get("body", None) - name = content.get("name", None) - topic = content.get("topic", None) - sender = ev.get("sender", None) - if ev["type"] == "m.room.message" and body: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.body", body - )) - if ev["type"] == "m.room.name" and name: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.name", name - )) - if ev["type"] == "m.room.topic" and topic: - rows.append(( - ev["event_id"], ev["room_id"], sender, "content.topic", topic - )) - - if rows: - logger.info(rows) - cur.executemany( - "INSERT INTO event_search (event_id, room_id, sender, key, value)" - " VALUES (?,?,?,?,?)", - rows - ) + cur.execute(sql, ("event_search", progress_json)) |