diff options
author | Steven Hammerton <steven.hammerton@openmarket.com> | 2015-11-17 10:55:41 +0000 |
---|---|---|
committer | Steven Hammerton <steven.hammerton@openmarket.com> | 2015-11-17 10:55:41 +0000 |
commit | f5e25c5f358de93f8bbba83496a44cf217ce086a (patch) | |
tree | c01c8462aa3a004e103de4befc45b7f9206439b1 /synapse/storage/search.py | |
parent | Snakes not camels (diff) | |
parent | Merge pull request #376 from matrix-org/daniel/jenkins (diff) | |
download | synapse-f5e25c5f358de93f8bbba83496a44cf217ce086a.tar.xz |
Merge branch 'develop' into sh-cas-auth-via-homeserver
Diffstat (limited to 'synapse/storage/search.py')
-rw-r--r-- | synapse/storage/search.py | 120 |
1 files changed, 115 insertions, 5 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 3cea2011fa..380270b009 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from _base import SQLBaseStore +from .background_updates import BackgroundUpdateStore from synapse.api.errors import SynapseError from synapse.storage.engines import PostgresEngine, Sqlite3Engine @@ -25,7 +25,106 @@ import logging logger = logging.getLogger(__name__) -class SearchStore(SQLBaseStore): +class SearchStore(BackgroundUpdateStore): + + EVENT_SEARCH_UPDATE_NAME = "event_search" + + def __init__(self, hs): + super(SearchStore, self).__init__(hs) + self.register_background_update_handler( + self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search + ) + + @defer.inlineCallbacks + def _background_reindex_search(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + TYPES = ["m.room.name", "m.room.message", "m.room.topic"] + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, event_id FROM events" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " AND (%s)" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + event_ids = [row[1] for row in rows] + + events = self._get_events_txn(txn, event_ids) + + event_search_rows = [] + for event in events: + try: + event_id = event.event_id + room_id = event.room_id + content = event.content + if event.type == "m.room.message": + key = "content.body" + value = content["body"] + elif event.type == "m.room.topic": + key = "content.topic" + value = content["topic"] + elif event.type == "m.room.name": + key = "content.name" + value = content["name"] + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + event_search_rows.append((event_id, room_id, key, value)) + + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, vector)" + " VALUES (?,?,?,to_tsvector('english', ?))" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): + clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(event_search_rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_SEARCH_UPDATE_NAME, progress + ) + + return len(event_search_rows) + + result = yield self.runInteraction( + self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME) + + defer.returnValue(result) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. @@ -153,12 +252,23 @@ class SearchStore(SQLBaseStore): " WHERE vector @@ query AND room_id = ?" ) elif isinstance(self.database_engine, Sqlite3Engine): + # We use CROSS JOIN here to ensure we use the right indexes. + # https://sqlite.org/optoverview.html#crossjoin + # + # We want to use the full text search index on event_search to + # extract all possible matches first, then lookup those matches + # in the events table to get the topological ordering. We need + # to use the indexes in this order because sqlite refuses to + # MATCH unless it uses the full text search index sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" + "SELECT rank(matchinfo) as rank, room_id, event_id," " topological_ordering, stream_ordering" + " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" " FROM event_search" - " NATURAL JOIN events" - " WHERE value MATCH ? AND room_id = ?" + " WHERE value MATCH ?" + " )" + " CROSS JOIN events USING (event_id)" + " WHERE room_id = ?" ) else: # This should be unreachable. |