summary refs log tree commit diff
path: root/synapse/storage/search.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-11-09 19:29:32 +0000
committerMark Haines <mark.haines@matrix.org>2015-11-09 19:29:32 +0000
commit2ede7aa8a1da20b735797cc9230da7bbeb55efc3 (patch)
treed0672dd55d4bd66e8244965c0150fd8dfb1f0295 /synapse/storage/search.py
parentAdd storage module for tracking background updates. (diff)
downloadsynapse-2ede7aa8a1da20b735797cc9230da7bbeb55efc3.tar.xz
Add background update task for reindexing event search
Diffstat (limited to '')
-rw-r--r--synapse/storage/search.py98
1 files changed, 96 insertions, 2 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 3cea2011fa..f7c269865d 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -15,7 +15,7 @@
 
 from twisted.internet import defer
 
-from _base import SQLBaseStore
+from .background_updates import BackgroundUpdateStore
 from synapse.api.errors import SynapseError
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
@@ -25,7 +25,101 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class SearchStore(SQLBaseStore):
+class SearchStore(BackgroundUpdateStore):
+
+    EVENT_SEARCH_UPDATE_NAME = "event_search"
+
+
+    @defer.inlineCallbacks
+    def _background_reindex_search(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id"]
+        max_stream_id = progress["max_stream_id"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+        TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
+
+        def reindex_search_txn(txn):
+            sql = (
+                "SELECT stream_id, event_id FROM events"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " AND (%s)"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            ) % (" OR ".join("type = '%s'" % TYPES),)
+
+            txn.execute(sql, target_min_stream_id, max_stream_id, batch_size)
+
+            rows = txn.fetch_all()
+            if not rows:
+                return None
+
+            min_stream_id = rows[-1][0]
+            event_ids = [row[1] for row in rows]
+
+            events = self._get_events_txn(txn, event_ids)
+
+            event_search_rows = []
+            for event in events:
+                try:
+                    event_id = event.event_id
+                    room_id = event.room_id
+                    content = event.content
+                    if event.type == "m.room.message":
+                        key = "content.body"
+                        value = content["body"]
+                    elif event.type == "m.room.topic":
+                        key = "content.topic"
+                        value = content["topic"]
+                    elif event.type == "m.room.name":
+                        key = "content.name"
+                        value = content["name"]
+                except Exception:
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                event_search_rows.append((event_id, room_id, key, value))
+
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = (
+                    "INSERT INTO event_search (event_id, room_id, key, vector)"
+                    " VALUES (?,?,?,to_tsvector('english', ?))"
+                )
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                sql = (
+                    "INSERT INTO event_search (event_id, room_id, key, value)"
+                    " VALUES (?,?,?,?)"
+                )
+            else:
+                # This should be unreachable.
+                raise Exception("Unrecognized database engine")
+
+            for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
+                clump = event_search_rows[index : index + INSERT_CLUMP_SIZE)
+                txn.execute_many(sql, clump)
+
+            progress = {
+                "target_max_stream_id": target_min_stream_id,
+                "max_stream_id": min_stream_id,
+                "rows_inserted": rows_inserted + len(event_search_rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_SEARCH_UPDATE_NAME, progress
+            )
+
+            return len(event_search_rows)
+
+        result = yield self.runInteration(
+            self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
+        )
+
+        if result is None:
+            yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
+
+        defer.returnValue(result)
+
     @defer.inlineCallbacks
     def search_msgs(self, room_ids, search_term, keys):
         """Performs a full text search over events with given keys.