summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/schema/delta/25/fts.py100
-rw-r--r--synapse/storage/search.py8
2 files changed, 28 insertions, 80 deletions
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index b7cd0ce3b8..e408d36da0 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -22,7 +22,7 @@ import ujson
 logger = logging.getLogger(__name__)
 
 
-POSTGRES_SQL = """
+POSTGRES_TABLE = """
 CREATE TABLE IF NOT EXISTS event_search (
     event_id TEXT,
     room_id TEXT,
@@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search (
     vector tsvector
 );
 
-INSERT INTO event_search SELECT
-    event_id, room_id, json::json->>'sender', 'content.body',
-    to_tsvector('english', json::json->'content'->>'body')
-    FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
-
-INSERT INTO event_search SELECT
-    event_id, room_id, json::json->>'sender', 'content.name',
-    to_tsvector('english', json::json->'content'->>'name')
-    FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
-
-INSERT INTO event_search SELECT
-    event_id, room_id, json::json->>'sender', 'content.topic',
-    to_tsvector('english', json::json->'content'->>'topic')
-    FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
-
-
 CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
 CREATE INDEX event_search_ev_idx ON event_search(event_id);
 CREATE INDEX event_search_ev_ridx ON event_search(room_id);
@@ -61,67 +45,31 @@ SQLITE_TABLE = (
 
 def run_upgrade(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
-        run_postgres_upgrade(cur)
+        for statement in get_statements(POSTGRES_TABLE.splitlines()):
+            cur.execute(statement)
         return
 
     if isinstance(database_engine, Sqlite3Engine):
-        run_sqlite_upgrade(cur)
-        return
-
-
-def run_postgres_upgrade(cur):
-    for statement in get_statements(POSTGRES_SQL.splitlines()):
-        cur.execute(statement)
-
-
-def run_sqlite_upgrade(cur):
         cur.execute(SQLITE_TABLE)
+        return
 
-        rowid = -1
-        while True:
-            cur.execute(
-                "SELECT rowid, json FROM event_json"
-                " WHERE rowid > ?"
-                " ORDER BY rowid ASC LIMIT 100",
-                (rowid,)
-            )
-
-            res = cur.fetchall()
-
-            if not res:
-                break
-
-            events = [
-                ujson.loads(js)
-                for _, js in res
-            ]
-
-            rowid = max(rid for rid, _ in res)
-
-            rows = []
-            for ev in events:
-                content = ev.get("content", {})
-                body = content.get("body", None)
-                name = content.get("name", None)
-                topic = content.get("topic", None)
-                sender = ev.get("sender", None)
-                if ev["type"] == "m.room.message" and body:
-                    rows.append((
-                        ev["event_id"], ev["room_id"], sender, "content.body", body
-                    ))
-                if ev["type"] == "m.room.name" and name:
-                    rows.append((
-                        ev["event_id"], ev["room_id"], sender, "content.name", name
-                    ))
-                if ev["type"] == "m.room.topic" and topic:
-                    rows.append((
-                        ev["event_id"], ev["room_id"], sender, "content.topic", topic
-                    ))
-
-            if rows:
-                logger.info(rows)
-                cur.executemany(
-                    "INSERT INTO event_search (event_id, room_id, sender, key, value)"
-                    " VALUES (?,?,?,?,?)",
-                    rows
-                )
+    cur.execute("SELECT MIN(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    min_stream_id = rows[0][0]
+
+    cur.execute("SELECT MAX(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    max_stream_id = rows[0][0]
+
+    if min_stream_id is not None and max_stream_id is not None:
+        progress = {
+            "target_min_stream_id_inclusive": min_stream_id,
+            "max_stream_id_exclusive": max_stream_id + 1,
+            "rows_inserted": 0,
+        }
+        progress_json = ujson.dumps(progress)
+
+        cur.execute(
+            "INSERT into background_updates (update_name, progress_json)"
+            " VALUES (?, ?)", ("event_search", progress_json)
+        )
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index d170c546b5..3b19b118eb 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -37,8 +37,8 @@ class SearchStore(BackgroundUpdateStore):
 
     @defer.inlineCallbacks
     def _background_reindex_search(self, progress, batch_size):
-        target_min_stream_id = progress["target_min_stream_id"]
-        max_stream_id = progress["max_stream_id"]
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
 
         INSERT_CLUMP_SIZE = 1000
@@ -105,8 +105,8 @@ class SearchStore(BackgroundUpdateStore):
                 txn.execute_many(sql, clump)
 
             progress = {
-                "target_max_stream_id": target_min_stream_id,
-                "max_stream_id": min_stream_id,
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
                 "rows_inserted": rows_inserted + len(event_search_rows)
             }