diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6420b2374..226f8f1b7e 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -30,10 +30,10 @@ from .background_updates import BackgroundUpdateStore
logger = logging.getLogger(__name__)
-SearchEntry = namedtuple('SearchEntry', [
- 'key', 'value', 'event_id', 'room_id', 'stream_ordering',
- 'origin_server_ts',
-])
+SearchEntry = namedtuple(
+ 'SearchEntry',
+ ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'],
+)
class SearchStore(BackgroundUpdateStore):
@@ -53,8 +53,7 @@ class SearchStore(BackgroundUpdateStore):
self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
)
self.register_background_update_handler(
- self.EVENT_SEARCH_ORDER_UPDATE_NAME,
- self._background_reindex_search_order
+ self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order
)
# we used to have a background update to turn the GIN index into a
@@ -62,13 +61,10 @@ class SearchStore(BackgroundUpdateStore):
# a GIN index. However, it's possible that some people might still have
# the background update queued, so we register a handler to clear the
# background update.
- self.register_noop_background_update(
- self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME,
- )
+ self.register_noop_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME)
self.register_background_update_handler(
- self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME,
- self._background_reindex_gin_search
+ self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
)
@defer.inlineCallbacks
@@ -138,21 +134,23 @@ class SearchStore(BackgroundUpdateStore):
# then skip over it
continue
- event_search_rows.append(SearchEntry(
- key=key,
- value=value,
- event_id=event_id,
- room_id=room_id,
- stream_ordering=stream_ordering,
- origin_server_ts=origin_server_ts,
- ))
+ event_search_rows.append(
+ SearchEntry(
+ key=key,
+ value=value,
+ event_id=event_id,
+ room_id=room_id,
+ stream_ordering=stream_ordering,
+ origin_server_ts=origin_server_ts,
+ )
+ )
self.store_search_entries_txn(txn, event_search_rows)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
"max_stream_id_exclusive": min_stream_id,
- "rows_inserted": rows_inserted + len(event_search_rows)
+ "rows_inserted": rows_inserted + len(event_search_rows),
}
self._background_update_progress_txn(
@@ -191,6 +189,7 @@ class SearchStore(BackgroundUpdateStore):
# doesn't support CREATE INDEX IF EXISTS so we just catch the
# exception and ignore it.
import psycopg2
+
try:
c.execute(
"CREATE INDEX CONCURRENTLY event_search_fts_idx"
@@ -198,14 +197,11 @@ class SearchStore(BackgroundUpdateStore):
)
except psycopg2.ProgrammingError as e:
logger.warn(
- "Ignoring error %r when trying to switch from GIST to GIN",
- e
+ "Ignoring error %r when trying to switch from GIST to GIN", e
)
# we should now be able to delete the GIST index.
- c.execute(
- "DROP INDEX IF EXISTS event_search_fts_idx_gist"
- )
+ c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
finally:
conn.set_session(autocommit=False)
@@ -223,6 +219,7 @@ class SearchStore(BackgroundUpdateStore):
have_added_index = progress['have_added_indexes']
if not have_added_index:
+
def create_index(conn):
conn.rollback()
conn.set_session(autocommit=True)
@@ -248,7 +245,8 @@ class SearchStore(BackgroundUpdateStore):
yield self.runInteraction(
self.EVENT_SEARCH_ORDER_UPDATE_NAME,
self._background_update_progress_txn,
- self.EVENT_SEARCH_ORDER_UPDATE_NAME, pg,
+ self.EVENT_SEARCH_ORDER_UPDATE_NAME,
+ pg,
)
def reindex_search_txn(txn):
@@ -302,14 +300,16 @@ class SearchStore(BackgroundUpdateStore):
"""
self.store_search_entries_txn(
txn,
- (SearchEntry(
- key=key,
- value=value,
- event_id=event.event_id,
- room_id=event.room_id,
- stream_ordering=event.internal_metadata.stream_ordering,
- origin_server_ts=event.origin_server_ts,
- ),),
+ (
+ SearchEntry(
+ key=key,
+ value=value,
+ event_id=event.event_id,
+ room_id=event.room_id,
+ stream_ordering=event.internal_metadata.stream_ordering,
+ origin_server_ts=event.origin_server_ts,
+ ),
+ ),
)
def store_search_entries_txn(self, txn, entries):
@@ -329,10 +329,17 @@ class SearchStore(BackgroundUpdateStore):
" VALUES (?,?,?,to_tsvector('english', ?),?,?)"
)
- args = ((
- entry.event_id, entry.room_id, entry.key, entry.value,
- entry.stream_ordering, entry.origin_server_ts,
- ) for entry in entries)
+ args = (
+ (
+ entry.event_id,
+ entry.room_id,
+ entry.key,
+ entry.value,
+ entry.stream_ordering,
+ entry.origin_server_ts,
+ )
+ for entry in entries
+ )
# inserts to a GIN index are normally batched up into a pending
# list, and then all committed together once the list gets to a
@@ -363,9 +370,10 @@ class SearchStore(BackgroundUpdateStore):
"INSERT INTO event_search (event_id, room_id, key, value)"
" VALUES (?,?,?,?)"
)
- args = ((
- entry.event_id, entry.room_id, entry.key, entry.value,
- ) for entry in entries)
+ args = (
+ (entry.event_id, entry.room_id, entry.key, entry.value)
+ for entry in entries
+ )
txn.executemany(sql, args)
else:
@@ -394,9 +402,7 @@ class SearchStore(BackgroundUpdateStore):
# Make sure we don't explode because the person is in too many rooms.
# We filter the results below regardless.
if len(room_ids) < 500:
- clauses.append(
- "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
- )
+ clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
args.extend(room_ids)
local_clauses = []
@@ -404,9 +410,7 @@ class SearchStore(BackgroundUpdateStore):
local_clauses.append("key = ?")
args.append(key)
- clauses.append(
- "(%s)" % (" OR ".join(local_clauses),)
- )
+ clauses.append("(%s)" % (" OR ".join(local_clauses),))
count_args = args
count_clauses = clauses
@@ -452,18 +456,13 @@ class SearchStore(BackgroundUpdateStore):
# entire table from the database.
sql += " ORDER BY rank DESC LIMIT 500"
- results = yield self._execute(
- "search_msgs", self.cursor_to_dict, sql, *args
- )
+ results = yield self._execute("search_msgs", self.cursor_to_dict, sql, *args)
results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results])
- event_map = {
- ev.event_id: ev
- for ev in events
- }
+ event_map = {ev.event_id: ev for ev in events}
highlights = None
if isinstance(self.database_engine, PostgresEngine):
@@ -477,18 +476,17 @@ class SearchStore(BackgroundUpdateStore):
count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
- defer.returnValue({
- "results": [
- {
- "event": event_map[r["event_id"]],
- "rank": r["rank"],
- }
- for r in results
- if r["event_id"] in event_map
- ],
- "highlights": highlights,
- "count": count,
- })
+ defer.returnValue(
+ {
+ "results": [
+ {"event": event_map[r["event_id"]], "rank": r["rank"]}
+ for r in results
+ if r["event_id"] in event_map
+ ],
+ "highlights": highlights,
+ "count": count,
+ }
+ )
@defer.inlineCallbacks
def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
@@ -513,9 +511,7 @@ class SearchStore(BackgroundUpdateStore):
# Make sure we don't explode because the person is in too many rooms.
# We filter the results below regardless.
if len(room_ids) < 500:
- clauses.append(
- "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
- )
+ clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
args.extend(room_ids)
local_clauses = []
@@ -523,9 +519,7 @@ class SearchStore(BackgroundUpdateStore):
local_clauses.append("key = ?")
args.append(key)
- clauses.append(
- "(%s)" % (" OR ".join(local_clauses),)
- )
+ clauses.append("(%s)" % (" OR ".join(local_clauses),))
# take copies of the current args and clauses lists, before adding
# pagination clauses to main query.
@@ -607,18 +601,13 @@ class SearchStore(BackgroundUpdateStore):
args.append(limit)
- results = yield self._execute(
- "search_rooms", self.cursor_to_dict, sql, *args
- )
+ results = yield self._execute("search_rooms", self.cursor_to_dict, sql, *args)
results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results])
- event_map = {
- ev.event_id: ev
- for ev in events
- }
+ event_map = {ev.event_id: ev for ev in events}
highlights = None
if isinstance(self.database_engine, PostgresEngine):
@@ -632,21 +621,22 @@ class SearchStore(BackgroundUpdateStore):
count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
- defer.returnValue({
- "results": [
- {
- "event": event_map[r["event_id"]],
- "rank": r["rank"],
- "pagination_token": "%s,%s" % (
- r["origin_server_ts"], r["stream_ordering"]
- ),
- }
- for r in results
- if r["event_id"] in event_map
- ],
- "highlights": highlights,
- "count": count,
- })
+ defer.returnValue(
+ {
+ "results": [
+ {
+ "event": event_map[r["event_id"]],
+ "rank": r["rank"],
+ "pagination_token": "%s,%s"
+ % (r["origin_server_ts"], r["stream_ordering"]),
+ }
+ for r in results
+ if r["event_id"] in event_map
+ ],
+ "highlights": highlights,
+ "count": count,
+ }
+ )
def _find_highlights_in_postgres(self, search_query, events):
"""Given a list of events and a search term, return a list of words
@@ -662,6 +652,7 @@ class SearchStore(BackgroundUpdateStore):
Returns:
deferred : A set of strings.
"""
+
def f(txn):
highlight_words = set()
for event in events:
@@ -689,13 +680,15 @@ class SearchStore(BackgroundUpdateStore):
stop_sel += ">"
query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % (
- _to_postgres_options({
- "StartSel": start_sel,
- "StopSel": stop_sel,
- "MaxFragments": "50",
- })
+ _to_postgres_options(
+ {
+ "StartSel": start_sel,
+ "StopSel": stop_sel,
+ "MaxFragments": "50",
+ }
+ )
)
- txn.execute(query, (value, search_query,))
+ txn.execute(query, (value, search_query))
headline, = txn.fetchall()[0]
# Now we need to pick the possible highlights out of the haedline
@@ -714,9 +707,7 @@ class SearchStore(BackgroundUpdateStore):
def _to_postgres_options(options_dict):
- return "'%s'" % (
- ",".join("%s=%s" % (k, v) for k, v in options_dict.items()),
- )
+ return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),)
def _parse_query(database_engine, search_term):
|