summary refs log tree commit diff
path: root/synapse/storage/search.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
committerErik Johnston <erik@matrix.org>2019-04-17 19:44:40 +0100
commitca90336a6935b36b5761244005b0f68b496d5d79 (patch)
tree6bbce5eafc0db3b24ccc3b59b051da850382ae09 /synapse/storage/search.py
parentAdd management endpoints for account validity (diff)
parentMerge pull request #5047 from matrix-org/babolivier/account_expiration (diff)
downloadsynapse-ca90336a6935b36b5761244005b0f68b496d5d79.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/account_expiration
Diffstat (limited to 'synapse/storage/search.py')
-rw-r--r--synapse/storage/search.py197
1 files changed, 94 insertions, 103 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index c6420b2374..226f8f1b7e 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -30,10 +30,10 @@ from .background_updates import BackgroundUpdateStore
 
 logger = logging.getLogger(__name__)
 
-SearchEntry = namedtuple('SearchEntry', [
-    'key', 'value', 'event_id', 'room_id', 'stream_ordering',
-    'origin_server_ts',
-])
+SearchEntry = namedtuple(
+    'SearchEntry',
+    ['key', 'value', 'event_id', 'room_id', 'stream_ordering', 'origin_server_ts'],
+)
 
 
 class SearchStore(BackgroundUpdateStore):
@@ -53,8 +53,7 @@ class SearchStore(BackgroundUpdateStore):
             self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
         )
         self.register_background_update_handler(
-            self.EVENT_SEARCH_ORDER_UPDATE_NAME,
-            self._background_reindex_search_order
+            self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order
         )
 
         # we used to have a background update to turn the GIN index into a
@@ -62,13 +61,10 @@ class SearchStore(BackgroundUpdateStore):
         # a GIN index. However, it's possible that some people might still have
         # the background update queued, so we register a handler to clear the
         # background update.
-        self.register_noop_background_update(
-            self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME,
-        )
+        self.register_noop_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME)
 
         self.register_background_update_handler(
-            self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME,
-            self._background_reindex_gin_search
+            self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
         )
 
     @defer.inlineCallbacks
@@ -138,21 +134,23 @@ class SearchStore(BackgroundUpdateStore):
                     # then skip over it
                     continue
 
-                event_search_rows.append(SearchEntry(
-                    key=key,
-                    value=value,
-                    event_id=event_id,
-                    room_id=room_id,
-                    stream_ordering=stream_ordering,
-                    origin_server_ts=origin_server_ts,
-                ))
+                event_search_rows.append(
+                    SearchEntry(
+                        key=key,
+                        value=value,
+                        event_id=event_id,
+                        room_id=room_id,
+                        stream_ordering=stream_ordering,
+                        origin_server_ts=origin_server_ts,
+                    )
+                )
 
             self.store_search_entries_txn(txn, event_search_rows)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
                 "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(event_search_rows)
+                "rows_inserted": rows_inserted + len(event_search_rows),
             }
 
             self._background_update_progress_txn(
@@ -191,6 +189,7 @@ class SearchStore(BackgroundUpdateStore):
                 # doesn't support CREATE INDEX IF EXISTS so we just catch the
                 # exception and ignore it.
                 import psycopg2
+
                 try:
                     c.execute(
                         "CREATE INDEX CONCURRENTLY event_search_fts_idx"
@@ -198,14 +197,11 @@ class SearchStore(BackgroundUpdateStore):
                     )
                 except psycopg2.ProgrammingError as e:
                     logger.warn(
-                        "Ignoring error %r when trying to switch from GIST to GIN",
-                        e
+                        "Ignoring error %r when trying to switch from GIST to GIN", e
                     )
 
                 # we should now be able to delete the GIST index.
-                c.execute(
-                    "DROP INDEX IF EXISTS event_search_fts_idx_gist"
-                )
+                c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
             finally:
                 conn.set_session(autocommit=False)
 
@@ -223,6 +219,7 @@ class SearchStore(BackgroundUpdateStore):
         have_added_index = progress['have_added_indexes']
 
         if not have_added_index:
+
             def create_index(conn):
                 conn.rollback()
                 conn.set_session(autocommit=True)
@@ -248,7 +245,8 @@ class SearchStore(BackgroundUpdateStore):
             yield self.runInteraction(
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
                 self._background_update_progress_txn,
-                self.EVENT_SEARCH_ORDER_UPDATE_NAME, pg,
+                self.EVENT_SEARCH_ORDER_UPDATE_NAME,
+                pg,
             )
 
         def reindex_search_txn(txn):
@@ -302,14 +300,16 @@ class SearchStore(BackgroundUpdateStore):
         """
         self.store_search_entries_txn(
             txn,
-            (SearchEntry(
-                key=key,
-                value=value,
-                event_id=event.event_id,
-                room_id=event.room_id,
-                stream_ordering=event.internal_metadata.stream_ordering,
-                origin_server_ts=event.origin_server_ts,
-            ),),
+            (
+                SearchEntry(
+                    key=key,
+                    value=value,
+                    event_id=event.event_id,
+                    room_id=event.room_id,
+                    stream_ordering=event.internal_metadata.stream_ordering,
+                    origin_server_ts=event.origin_server_ts,
+                ),
+            ),
         )
 
     def store_search_entries_txn(self, txn, entries):
@@ -329,10 +329,17 @@ class SearchStore(BackgroundUpdateStore):
                 " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
             )
 
-            args = ((
-                entry.event_id, entry.room_id, entry.key, entry.value,
-                entry.stream_ordering, entry.origin_server_ts,
-            ) for entry in entries)
+            args = (
+                (
+                    entry.event_id,
+                    entry.room_id,
+                    entry.key,
+                    entry.value,
+                    entry.stream_ordering,
+                    entry.origin_server_ts,
+                )
+                for entry in entries
+            )
 
             # inserts to a GIN index are normally batched up into a pending
             # list, and then all committed together once the list gets to a
@@ -363,9 +370,10 @@ class SearchStore(BackgroundUpdateStore):
                 "INSERT INTO event_search (event_id, room_id, key, value)"
                 " VALUES (?,?,?,?)"
             )
-            args = ((
-                entry.event_id, entry.room_id, entry.key, entry.value,
-            ) for entry in entries)
+            args = (
+                (entry.event_id, entry.room_id, entry.key, entry.value)
+                for entry in entries
+            )
 
             txn.executemany(sql, args)
         else:
@@ -394,9 +402,7 @@ class SearchStore(BackgroundUpdateStore):
         # Make sure we don't explode because the person is in too many rooms.
         # We filter the results below regardless.
         if len(room_ids) < 500:
-            clauses.append(
-                "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
-            )
+            clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
             args.extend(room_ids)
 
         local_clauses = []
@@ -404,9 +410,7 @@ class SearchStore(BackgroundUpdateStore):
             local_clauses.append("key = ?")
             args.append(key)
 
-        clauses.append(
-            "(%s)" % (" OR ".join(local_clauses),)
-        )
+        clauses.append("(%s)" % (" OR ".join(local_clauses),))
 
         count_args = args
         count_clauses = clauses
@@ -452,18 +456,13 @@ class SearchStore(BackgroundUpdateStore):
         # entire table from the database.
         sql += " ORDER BY rank DESC LIMIT 500"
 
-        results = yield self._execute(
-            "search_msgs", self.cursor_to_dict, sql, *args
-        )
+        results = yield self._execute("search_msgs", self.cursor_to_dict, sql, *args)
 
         results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
         events = yield self._get_events([r["event_id"] for r in results])
 
-        event_map = {
-            ev.event_id: ev
-            for ev in events
-        }
+        event_map = {ev.event_id: ev for ev in events}
 
         highlights = None
         if isinstance(self.database_engine, PostgresEngine):
@@ -477,18 +476,17 @@ class SearchStore(BackgroundUpdateStore):
 
         count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
 
-        defer.returnValue({
-            "results": [
-                {
-                    "event": event_map[r["event_id"]],
-                    "rank": r["rank"],
-                }
-                for r in results
-                if r["event_id"] in event_map
-            ],
-            "highlights": highlights,
-            "count": count,
-        })
+        defer.returnValue(
+            {
+                "results": [
+                    {"event": event_map[r["event_id"]], "rank": r["rank"]}
+                    for r in results
+                    if r["event_id"] in event_map
+                ],
+                "highlights": highlights,
+                "count": count,
+            }
+        )
 
     @defer.inlineCallbacks
     def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
@@ -513,9 +511,7 @@ class SearchStore(BackgroundUpdateStore):
         # Make sure we don't explode because the person is in too many rooms.
         # We filter the results below regardless.
         if len(room_ids) < 500:
-            clauses.append(
-                "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
-            )
+            clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),))
             args.extend(room_ids)
 
         local_clauses = []
@@ -523,9 +519,7 @@ class SearchStore(BackgroundUpdateStore):
             local_clauses.append("key = ?")
             args.append(key)
 
-        clauses.append(
-            "(%s)" % (" OR ".join(local_clauses),)
-        )
+        clauses.append("(%s)" % (" OR ".join(local_clauses),))
 
         # take copies of the current args and clauses lists, before adding
         # pagination clauses to main query.
@@ -607,18 +601,13 @@ class SearchStore(BackgroundUpdateStore):
 
         args.append(limit)
 
-        results = yield self._execute(
-            "search_rooms", self.cursor_to_dict, sql, *args
-        )
+        results = yield self._execute("search_rooms", self.cursor_to_dict, sql, *args)
 
         results = list(filter(lambda row: row["room_id"] in room_ids, results))
 
         events = yield self._get_events([r["event_id"] for r in results])
 
-        event_map = {
-            ev.event_id: ev
-            for ev in events
-        }
+        event_map = {ev.event_id: ev for ev in events}
 
         highlights = None
         if isinstance(self.database_engine, PostgresEngine):
@@ -632,21 +621,22 @@ class SearchStore(BackgroundUpdateStore):
 
         count = sum(row["count"] for row in count_results if row["room_id"] in room_ids)
 
-        defer.returnValue({
-            "results": [
-                {
-                    "event": event_map[r["event_id"]],
-                    "rank": r["rank"],
-                    "pagination_token": "%s,%s" % (
-                        r["origin_server_ts"], r["stream_ordering"]
-                    ),
-                }
-                for r in results
-                if r["event_id"] in event_map
-            ],
-            "highlights": highlights,
-            "count": count,
-        })
+        defer.returnValue(
+            {
+                "results": [
+                    {
+                        "event": event_map[r["event_id"]],
+                        "rank": r["rank"],
+                        "pagination_token": "%s,%s"
+                        % (r["origin_server_ts"], r["stream_ordering"]),
+                    }
+                    for r in results
+                    if r["event_id"] in event_map
+                ],
+                "highlights": highlights,
+                "count": count,
+            }
+        )
 
     def _find_highlights_in_postgres(self, search_query, events):
         """Given a list of events and a search term, return a list of words
@@ -662,6 +652,7 @@ class SearchStore(BackgroundUpdateStore):
         Returns:
             deferred : A set of strings.
         """
+
         def f(txn):
             highlight_words = set()
             for event in events:
@@ -689,13 +680,15 @@ class SearchStore(BackgroundUpdateStore):
                     stop_sel += ">"
 
                 query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % (
-                    _to_postgres_options({
-                        "StartSel": start_sel,
-                        "StopSel": stop_sel,
-                        "MaxFragments": "50",
-                    })
+                    _to_postgres_options(
+                        {
+                            "StartSel": start_sel,
+                            "StopSel": stop_sel,
+                            "MaxFragments": "50",
+                        }
+                    )
                 )
-                txn.execute(query, (value, search_query,))
+                txn.execute(query, (value, search_query))
                 headline, = txn.fetchall()[0]
 
                 # Now we need to pick the possible highlights out of the haedline
@@ -714,9 +707,7 @@ class SearchStore(BackgroundUpdateStore):
 
 
 def _to_postgres_options(options_dict):
-    return "'%s'" % (
-        ",".join("%s=%s" % (k, v) for k, v in options_dict.items()),
-    )
+    return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),)
 
 
 def _parse_query(database_engine, search_term):