diff options
author | Mark Haines <mark.haines@matrix.org> | 2015-11-09 19:29:32 +0000 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2015-11-09 19:29:32 +0000 |
commit | 2ede7aa8a1da20b735797cc9230da7bbeb55efc3 (patch) | |
tree | d0672dd55d4bd66e8244965c0150fd8dfb1f0295 /synapse/storage | |
parent | Add storage module for tracking background updates. (diff) | |
download | synapse-2ede7aa8a1da20b735797cc9230da7bbeb55efc3.tar.xz |
Add background update task for reindexing event search
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/search.py | 98 |
1 files changed, 96 insertions, 2 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3cea2011fa..f7c269865d 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from _base import SQLBaseStore +from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -25,7 +25,101 @@ import logging logger = logging.getLogger(__name__) -class SearchStore(SQLBaseStore): +class SearchStore(BackgroundUpdateStore): + + EVENT_SEARCH_UPDATE_NAME = "event_search" + + + @defer.inlineCallbacks + def _background_reindex_search(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id"] + max_stream_id = progress["max_stream_id"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + TYPES = ["m.room.name", "m.room.message", "m.room.topic"] + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_id, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " AND (%s)" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) % (" OR ".join("type = '%s'" % TYPES),) + + txn.execute(sql, target_min_stream_id, max_stream_id, batch_size) + + rows = txn.fetch_all() + if not rows: + return None + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + event_search_rows = [] + for event in events: + try: + event_id = event.event_id + room_id = event.room_id + content = event.content + if event.type == "m.room.message": + key = "content.body" + value = content["body"] + elif event.type == "m.room.topic": + key = "content.topic" + value = content["topic"] + elif event.type == "m.room.name": + key = "content.name" + value = content["name"] + except Exception: + # If the event is missing a necessary field then + # skip over it. + continue + + event_search_rows.append((event_id, room_id, key, value)) + + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, vector)" + " VALUES (?,?,?,to_tsvector('english', ?))" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): + clump = event_search_rows[index : index + INSERT_CLUMP_SIZE) + txn.execute_many(sql, clump) + + progress = { + "target_max_stream_id": target_min_stream_id, + "max_stream_id": min_stream_id, + "rows_inserted": rows_inserted + len(event_search_rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_SEARCH_UPDATE_NAME, progress + ) + + return len(event_search_rows) + + result = yield self.runInteration( + self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn + ) + + if result is None: + yield _end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + + defer.returnValue(result) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. |