diff options
author | Erik Johnston <erik@matrix.org> | 2019-04-17 19:44:40 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-04-17 19:44:40 +0100 |
commit | ca90336a6935b36b5761244005b0f68b496d5d79 (patch) | |
tree | 6bbce5eafc0db3b24ccc3b59b051da850382ae09 /synapse/storage/search.py | |
parent | Add management endpoints for account validity (diff) | |
parent | Merge pull request #5047 from matrix-org/babolivier/account_expiration (diff) | |
download | synapse-ca90336a6935b36b5761244005b0f68b496d5d79.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/account_expiration
Diffstat (limited to 'synapse/storage/search.py')
-rw-r--r-- | synapse/storage/search.py | 197 |
1 files changed, 94 insertions, 103 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py index c6420b2374..226f8f1b7e 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -30,10 +30,10 @@ from .background_updates import BackgroundUpdateStore logger = logging.getLogger(__name__) -SearchEntry = namedtuple('SearchEntry', [ - 'key', 'value', 'event_id', 'room_id', 'stream_ordering', - 'origin_server_ts', -]) +SearchEntry = namedtuple( + 'SearchEntry', + ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'], +) class SearchStore(BackgroundUpdateStore): @@ -53,8 +53,7 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search ) self.register_background_update_handler( - self.EVENT_SEARCH_ORDER_UPDATE_NAME, - self._background_reindex_search_order + self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) # we used to have a background update to turn the GIN index into a @@ -62,13 +61,10 @@ class SearchStore(BackgroundUpdateStore): # a GIN index. However, it's possible that some people might still have # the background update queued, so we register a handler to clear the # background update. - self.register_noop_background_update( - self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, - ) + self.register_noop_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME) self.register_background_update_handler( - self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, - self._background_reindex_gin_search + self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search ) @defer.inlineCallbacks @@ -138,21 +134,23 @@ class SearchStore(BackgroundUpdateStore): # then skip over it continue - event_search_rows.append(SearchEntry( - key=key, - value=value, - event_id=event_id, - room_id=room_id, - stream_ordering=stream_ordering, - origin_server_ts=origin_server_ts, - )) + event_search_rows.append( + SearchEntry( + key=key, + value=value, + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + origin_server_ts=origin_server_ts, + ) + ) self.store_search_entries_txn(txn, event_search_rows) progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, - "rows_inserted": rows_inserted + len(event_search_rows) + "rows_inserted": rows_inserted + len(event_search_rows), } self._background_update_progress_txn( @@ -191,6 +189,7 @@ class SearchStore(BackgroundUpdateStore): # doesn't support CREATE INDEX IF EXISTS so we just catch the # exception and ignore it. import psycopg2 + try: c.execute( "CREATE INDEX CONCURRENTLY event_search_fts_idx" @@ -198,14 +197,11 @@ class SearchStore(BackgroundUpdateStore): ) except psycopg2.ProgrammingError as e: logger.warn( - "Ignoring error %r when trying to switch from GIST to GIN", - e + "Ignoring error %r when trying to switch from GIST to GIN", e ) # we should now be able to delete the GIST index. - c.execute( - "DROP INDEX IF EXISTS event_search_fts_idx_gist" - ) + c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") finally: conn.set_session(autocommit=False) @@ -223,6 +219,7 @@ class SearchStore(BackgroundUpdateStore): have_added_index = progress['have_added_indexes'] if not have_added_index: + def create_index(conn): conn.rollback() conn.set_session(autocommit=True) @@ -248,7 +245,8 @@ class SearchStore(BackgroundUpdateStore): yield self.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_update_progress_txn, - self.EVENT_SEARCH_ORDER_UPDATE_NAME, pg, + self.EVENT_SEARCH_ORDER_UPDATE_NAME, + pg, ) def reindex_search_txn(txn): @@ -302,14 +300,16 @@ class SearchStore(BackgroundUpdateStore): """ self.store_search_entries_txn( txn, - (SearchEntry( - key=key, - value=value, - event_id=event.event_id, - room_id=event.room_id, - stream_ordering=event.internal_metadata.stream_ordering, - origin_server_ts=event.origin_server_ts, - ),), + ( + SearchEntry( + key=key, + value=value, + event_id=event.event_id, + room_id=event.room_id, + stream_ordering=event.internal_metadata.stream_ordering, + origin_server_ts=event.origin_server_ts, + ), + ), ) def store_search_entries_txn(self, txn, entries): @@ -329,10 +329,17 @@ class SearchStore(BackgroundUpdateStore): " VALUES (?,?,?,to_tsvector('english', ?),?,?)" ) - args = (( - entry.event_id, entry.room_id, entry.key, entry.value, - entry.stream_ordering, entry.origin_server_ts, - ) for entry in entries) + args = ( + ( + entry.event_id, + entry.room_id, + entry.key, + entry.value, + entry.stream_ordering, + entry.origin_server_ts, + ) + for entry in entries + ) # inserts to a GIN index are normally batched up into a pending # list, and then all committed together once the list gets to a @@ -363,9 +370,10 @@ class SearchStore(BackgroundUpdateStore): "INSERT INTO event_search (event_id, room_id, key, value)" " VALUES (?,?,?,?)" ) - args = (( - entry.event_id, entry.room_id, entry.key, entry.value, - ) for entry in entries) + args = ( + (entry.event_id, entry.room_id, entry.key, entry.value) + for entry in entries + ) txn.executemany(sql, args) else: @@ -394,9 +402,7 @@ class SearchStore(BackgroundUpdateStore): # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. if len(room_ids) < 500: - clauses.append( - "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),) - ) + clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)) args.extend(room_ids) local_clauses = [] @@ -404,9 +410,7 @@ class SearchStore(BackgroundUpdateStore): local_clauses.append("key = ?") args.append(key) - clauses.append( - "(%s)" % (" OR ".join(local_clauses),) - ) + clauses.append("(%s)" % (" OR ".join(local_clauses),)) count_args = args count_clauses = clauses @@ -452,18 +456,13 @@ class SearchStore(BackgroundUpdateStore): # entire table from the database. sql += " ORDER BY rank DESC LIMIT 500" - results = yield self._execute( - "search_msgs", self.cursor_to_dict, sql, *args - ) + results = yield self._execute("search_msgs", self.cursor_to_dict, sql, *args) results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) - event_map = { - ev.event_id: ev - for ev in events - } + event_map = {ev.event_id: ev for ev in events} highlights = None if isinstance(self.database_engine, PostgresEngine): @@ -477,18 +476,17 @@ class SearchStore(BackgroundUpdateStore): count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) - defer.returnValue({ - "results": [ - { - "event": event_map[r["event_id"]], - "rank": r["rank"], - } - for r in results - if r["event_id"] in event_map - ], - "highlights": highlights, - "count": count, - }) + defer.returnValue( + { + "results": [ + {"event": event_map[r["event_id"]], "rank": r["rank"]} + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + "count": count, + } + ) @defer.inlineCallbacks def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): @@ -513,9 +511,7 @@ class SearchStore(BackgroundUpdateStore): # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. if len(room_ids) < 500: - clauses.append( - "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),) - ) + clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)) args.extend(room_ids) local_clauses = [] @@ -523,9 +519,7 @@ class SearchStore(BackgroundUpdateStore): local_clauses.append("key = ?") args.append(key) - clauses.append( - "(%s)" % (" OR ".join(local_clauses),) - ) + clauses.append("(%s)" % (" OR ".join(local_clauses),)) # take copies of the current args and clauses lists, before adding # pagination clauses to main query. @@ -607,18 +601,13 @@ class SearchStore(BackgroundUpdateStore): args.append(limit) - results = yield self._execute( - "search_rooms", self.cursor_to_dict, sql, *args - ) + results = yield self._execute("search_rooms", self.cursor_to_dict, sql, *args) results = list(filter(lambda row: row["room_id"] in room_ids, results)) events = yield self._get_events([r["event_id"] for r in results]) - event_map = { - ev.event_id: ev - for ev in events - } + event_map = {ev.event_id: ev for ev in events} highlights = None if isinstance(self.database_engine, PostgresEngine): @@ -632,21 +621,22 @@ class SearchStore(BackgroundUpdateStore): count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) - defer.returnValue({ - "results": [ - { - "event": event_map[r["event_id"]], - "rank": r["rank"], - "pagination_token": "%s,%s" % ( - r["origin_server_ts"], r["stream_ordering"] - ), - } - for r in results - if r["event_id"] in event_map - ], - "highlights": highlights, - "count": count, - }) + defer.returnValue( + { + "results": [ + { + "event": event_map[r["event_id"]], + "rank": r["rank"], + "pagination_token": "%s,%s" + % (r["origin_server_ts"], r["stream_ordering"]), + } + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + "count": count, + } + ) def _find_highlights_in_postgres(self, search_query, events): """Given a list of events and a search term, return a list of words @@ -662,6 +652,7 @@ class SearchStore(BackgroundUpdateStore): Returns: deferred : A set of strings. """ + def f(txn): highlight_words = set() for event in events: @@ -689,13 +680,15 @@ class SearchStore(BackgroundUpdateStore): stop_sel += ">" query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % ( - _to_postgres_options({ - "StartSel": start_sel, - "StopSel": stop_sel, - "MaxFragments": "50", - }) + _to_postgres_options( + { + "StartSel": start_sel, + "StopSel": stop_sel, + "MaxFragments": "50", + } + ) ) - txn.execute(query, (value, search_query,)) + txn.execute(query, (value, search_query)) headline, = txn.fetchall()[0] # Now we need to pick the possible highlights out of the haedline @@ -714,9 +707,7 @@ class SearchStore(BackgroundUpdateStore): def _to_postgres_options(options_dict): - return "'%s'" % ( - ",".join("%s=%s" % (k, v) for k, v in options_dict.items()), - ) + return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),) def _parse_query(database_engine, search_term): |