diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index b7cd0ce3b8..e408d36da0 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -22,7 +22,7 @@ import ujson
logger = logging.getLogger(__name__)
-POSTGRES_SQL = """
+POSTGRES_TABLE = """
CREATE TABLE IF NOT EXISTS event_search (
event_id TEXT,
room_id TEXT,
@@ -31,22 +31,6 @@ CREATE TABLE IF NOT EXISTS event_search (
vector tsvector
);
-INSERT INTO event_search SELECT
- event_id, room_id, json::json->>'sender', 'content.body',
- to_tsvector('english', json::json->'content'->>'body')
- FROM events NATURAL JOIN event_json WHERE type = 'm.room.message';
-
-INSERT INTO event_search SELECT
- event_id, room_id, json::json->>'sender', 'content.name',
- to_tsvector('english', json::json->'content'->>'name')
- FROM events NATURAL JOIN event_json WHERE type = 'm.room.name';
-
-INSERT INTO event_search SELECT
- event_id, room_id, json::json->>'sender', 'content.topic',
- to_tsvector('english', json::json->'content'->>'topic')
- FROM events NATURAL JOIN event_json WHERE type = 'm.room.topic';
-
-
CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
CREATE INDEX event_search_ev_idx ON event_search(event_id);
CREATE INDEX event_search_ev_ridx ON event_search(room_id);
@@ -61,67 +45,31 @@ SQLITE_TABLE = (
def run_upgrade(cur, database_engine, *args, **kwargs):
if isinstance(database_engine, PostgresEngine):
- run_postgres_upgrade(cur)
+ for statement in get_statements(POSTGRES_TABLE.splitlines()):
+ cur.execute(statement)
return
if isinstance(database_engine, Sqlite3Engine):
- run_sqlite_upgrade(cur)
- return
-
-
-def run_postgres_upgrade(cur):
- for statement in get_statements(POSTGRES_SQL.splitlines()):
- cur.execute(statement)
-
-
-def run_sqlite_upgrade(cur):
cur.execute(SQLITE_TABLE)
+ return
- rowid = -1
- while True:
- cur.execute(
- "SELECT rowid, json FROM event_json"
- " WHERE rowid > ?"
- " ORDER BY rowid ASC LIMIT 100",
- (rowid,)
- )
-
- res = cur.fetchall()
-
- if not res:
- break
-
- events = [
- ujson.loads(js)
- for _, js in res
- ]
-
- rowid = max(rid for rid, _ in res)
-
- rows = []
- for ev in events:
- content = ev.get("content", {})
- body = content.get("body", None)
- name = content.get("name", None)
- topic = content.get("topic", None)
- sender = ev.get("sender", None)
- if ev["type"] == "m.room.message" and body:
- rows.append((
- ev["event_id"], ev["room_id"], sender, "content.body", body
- ))
- if ev["type"] == "m.room.name" and name:
- rows.append((
- ev["event_id"], ev["room_id"], sender, "content.name", name
- ))
- if ev["type"] == "m.room.topic" and topic:
- rows.append((
- ev["event_id"], ev["room_id"], sender, "content.topic", topic
- ))
-
- if rows:
- logger.info(rows)
- cur.executemany(
- "INSERT INTO event_search (event_id, room_id, sender, key, value)"
- " VALUES (?,?,?,?,?)",
- rows
- )
+ cur.execute("SELECT MIN(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ min_stream_id = rows[0][0]
+
+ cur.execute("SELECT MAX(stream_ordering) FROM events")
+ rows = cur.fetchall()
+ max_stream_id = rows[0][0]
+
+ if min_stream_id is not None and max_stream_id is not None:
+ progress = {
+ "target_min_stream_id_inclusive": min_stream_id,
+ "max_stream_id_exclusive": max_stream_id + 1,
+ "rows_inserted": 0,
+ }
+ progress_json = ujson.dumps(progress)
+
+ cur.execute(
+ "INSERT into background_updates (update_name, progress_json)"
+ " VALUES (?, ?)", ("event_search", progress_json)
+ )
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index d170c546b5..3b19b118eb 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -37,8 +37,8 @@ class SearchStore(BackgroundUpdateStore):
@defer.inlineCallbacks
def _background_reindex_search(self, progress, batch_size):
- target_min_stream_id = progress["target_min_stream_id"]
- max_stream_id = progress["max_stream_id"]
+ target_min_stream_id = progress["target_min_stream_id_inclusive"]
+ max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
@@ -105,8 +105,8 @@ class SearchStore(BackgroundUpdateStore):
txn.execute_many(sql, clump)
progress = {
- "target_max_stream_id": target_min_stream_id,
- "max_stream_id": min_stream_id,
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
"rows_inserted": rows_inserted + len(event_search_rows)
}
|