diff options
author | Erik Johnston <erik@matrix.org> | 2016-04-21 16:41:39 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-04-21 16:56:14 +0100 |
commit | c877f0f0345f1ff6d329af2920d7d1a6b5659a86 (patch) | |
tree | 88c1372151b89c7ee5722627670eb3eb5a650f70 /synapse/storage/search.py | |
parent | pip install new python dependencies in jenkins.sh (diff) | |
download | synapse-c877f0f0345f1ff6d329af2920d7d1a6b5659a86.tar.xz |
Optimise event_search in postgres
Diffstat (limited to 'synapse/storage/search.py')
-rw-r--r-- | synapse/storage/search.py | 61 |
1 files changed, 60 insertions, 1 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 59ac7f424c..375057fa3e 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -29,12 +29,17 @@ logger = logging.getLogger(__name__) class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" + EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" def __init__(self, hs): super(SearchStore, self).__init__(hs) self.register_background_update_handler( self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search ) + self.register_background_update_handler( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, + self._background_reindex_search_order + ) @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): @@ -132,6 +137,61 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) @defer.inlineCallbacks + def _background_reindex_search_order(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + INSERT_CLUMP_SIZE = 1000 + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, origin_server_ts, event_id FROM events" + " INNER JOIN event_search USING (room_id, event_id)" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + min_stream_id = rows[-1][0] + + sql = ( + "UPDATE event_search SET stream_ordering = ?, origin_server_ts = ?" + " WHERE event_id = ?" + ) + + for index in range(0, len(rows), INSERT_CLUMP_SIZE): + clump = rows[index:index + INSERT_CLUMP_SIZE] + txn.executemany(sql, clump) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows) + } + + self._background_update_progress_txn( + txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress + ) + + return len(rows) + + result = yield self.runInteraction( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn + ) + + if not result: + yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME) + + defer.returnValue(result) + + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. @@ -310,7 +370,6 @@ class SearchStore(BackgroundUpdateStore): "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank," " origin_server_ts, stream_ordering, room_id, event_id" " FROM event_search" - " NATURAL JOIN events" " WHERE vector @@ to_tsquery('english', ?) AND " ) args = [search_query, search_query] + args |