summary refs log tree commit diff
path: root/synapse/storage/schema/delta
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-11-11 17:19:51 +0000
committerErik Johnston <erik@matrix.org>2015-11-11 17:19:51 +0000
commite21cef9bb56493cf05fa82e2d3262df213657cf6 (patch)
tree54fada4d427cfa90dc1a327ac01cc73c9a0fabad /synapse/storage/schema/delta
parentMerge pull request #358 from matrix-org/daniel/publicwritable (diff)
parentFix param style to work on both sqlite and postgres (diff)
downloadsynapse-e21cef9bb56493cf05fa82e2d3262df213657cf6.tar.xz
Merge pull request #359 from matrix-org/markjh/incremental_indexing
Incremental background updates for db indexes
Diffstat (limited to 'synapse/storage/schema/delta')
-rw-r--r--synapse/storage/schema/delta/25/00background_updates.sql21
-rw-r--r--synapse/storage/schema/delta/25/fts.py101
2 files changed, 47 insertions, 75 deletions
diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql
new file mode 100644
index 0000000000..41a9b59b1b
--- /dev/null
+++ b/synapse/storage/schema/delta/25/00background_updates.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS background_updates(
+    update_name TEXT NOT NULL, -- The name of the background update.
+    progress_json TEXT NOT NULL, -- The current progress of the update as JSON.
+    CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
+);
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index b7cd0ce3b8..5239d69073 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -22,7 +22,7 @@ import ujson
 logger = logging.getLogger(__name__)
 
 
-POSTGRES_SQL = """
+POSTGRES_TABLE = """
 CREATE TABLE IF NOT EXISTS event_search (
     event_id TEXT,
     room_id TEXT,
@@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search (
     vector tsvector
 );
 
-INSERT INTO event_search SELECT
-    event_id, room_id, json::json->>'sender', 'content.body',
-    to_tsvector('english', json::json->'content'->>'body')
-    FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
-
-INSERT INTO event_search SELECT
-    event_id, room_id, json::json->>'sender', 'content.name',
-    to_tsvector('english', json::json->'content'->>'name')
-    FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
-
-INSERT INTO event_search SELECT
-    event_id, room_id, json::json->>'sender', 'content.topic',
-    to_tsvector('english', json::json->'content'->>'topic')
-    FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
-
-
 CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
 CREATE INDEX event_search_ev_idx ON event_search(event_id);
 CREATE INDEX event_search_ev_ridx ON event_search(room_id);
@@ -61,67 +45,34 @@ SQLITE_TABLE = (
 
 def run_upgrade(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
-        run_postgres_upgrade(cur)
-        return
+        for statement in get_statements(POSTGRES_TABLE.splitlines()):
+            cur.execute(statement)
+    elif isinstance(database_engine, Sqlite3Engine):
+        cur.execute(SQLITE_TABLE)
+    else:
+        raise Exception("Unrecognized database engine")
 
-    if isinstance(database_engine, Sqlite3Engine):
-        run_sqlite_upgrade(cur)
-        return
+    cur.execute("SELECT MIN(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    min_stream_id = rows[0][0]
 
+    cur.execute("SELECT MAX(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    max_stream_id = rows[0][0]
 
-def run_postgres_upgrade(cur):
-    for statement in get_statements(POSTGRES_SQL.splitlines()):
-        cur.execute(statement)
+    if min_stream_id is not None and max_stream_id is not None:
+        progress = {
+            "target_min_stream_id_inclusive": min_stream_id,
+            "max_stream_id_exclusive": max_stream_id + 1,
+            "rows_inserted": 0,
+        }
+        progress_json = ujson.dumps(progress)
 
+        sql = (
+            "INSERT into background_updates (update_name, progress_json)"
+            " VALUES (?, ?)"
+        )
 
-def run_sqlite_upgrade(cur):
-        cur.execute(SQLITE_TABLE)
+        sql = database_engine.convert_param_style(sql)
 
-        rowid = -1
-        while True:
-            cur.execute(
-                "SELECT rowid, json FROM event_json"
-                " WHERE rowid > ?"
-                " ORDER BY rowid ASC LIMIT 100",
-                (rowid,)
-            )
-
-            res = cur.fetchall()
-
-            if not res:
-                break
-
-            events = [
-                ujson.loads(js)
-                for _, js in res
-            ]
-
-            rowid = max(rid for rid, _ in res)
-
-            rows = []
-            for ev in events:
-                content = ev.get("content", {})
-                body = content.get("body", None)
-                name = content.get("name", None)
-                topic = content.get("topic", None)
-                sender = ev.get("sender", None)
-                if ev["type"] == "m.room.message" and body:
-                    rows.append((
-                        ev["event_id"], ev["room_id"], sender, "content.body", body
-                    ))
-                if ev["type"] == "m.room.name" and name:
-                    rows.append((
-                        ev["event_id"], ev["room_id"], sender, "content.name", name
-                    ))
-                if ev["type"] == "m.room.topic" and topic:
-                    rows.append((
-                        ev["event_id"], ev["room_id"], sender, "content.topic", topic
-                    ))
-
-            if rows:
-                logger.info(rows)
-                cur.executemany(
-                    "INSERT INTO event_search (event_id, room_id, sender, key, value)"
-                    " VALUES (?,?,?,?,?)",
-                    rows
-                )
+        cur.execute(sql, ("event_search", progress_json))