summary refs log tree commit diff
path: root/synapse/storage/databases/main/search.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-08-07 12:17:17 -0400
committerGitHub <noreply@github.com>2020-08-07 12:17:17 -0400
commitf3fe6961b211d898aa347771df598c531fbca90c (patch)
tree2389d40f2742f84264b3688edff377774dbfc495 /synapse/storage/databases/main/search.py
parentClarify that undoing a shutdown might not be possible (#8010) (diff)
downloadsynapse-f3fe6961b211d898aa347771df598c531fbca90c.tar.xz
Convert additional database stores to async/await (#8045)
Diffstat (limited to 'synapse/storage/databases/main/search.py')
-rw-r--r--synapse/storage/databases/main/search.py69
1 files changed, 35 insertions, 34 deletions
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 2162d0712d..7f8d1880e5 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -16,8 +16,7 @@
 import logging
 import re
 from collections import namedtuple
-
-from twisted.internet import defer
+from typing import List, Optional
 
 from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -114,8 +113,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
             self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
         )
 
-    @defer.inlineCallbacks
-    def _background_reindex_search(self, progress, batch_size):
+    async def _background_reindex_search(self, progress, batch_size):
         # we work through the events table from highest stream id to lowest
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -206,19 +204,18 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
 
             return len(event_search_rows)
 
-        result = yield self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
         )
 
         if not result:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.EVENT_SEARCH_UPDATE_NAME
             )
 
         return result
 
-    @defer.inlineCallbacks
-    def _background_reindex_gin_search(self, progress, batch_size):
+    async def _background_reindex_gin_search(self, progress, batch_size):
         """This handles old synapses which used GIST indexes, if any;
         converting them back to be GIN as per the actual schema.
         """
@@ -255,15 +252,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                 conn.set_session(autocommit=False)
 
         if isinstance(self.database_engine, PostgresEngine):
-            yield self.db_pool.runWithConnection(create_index)
+            await self.db_pool.runWithConnection(create_index)
 
-        yield self.db_pool.updates._end_background_update(
+        await self.db_pool.updates._end_background_update(
             self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
         )
         return 1
 
-    @defer.inlineCallbacks
-    def _background_reindex_search_order(self, progress, batch_size):
+    async def _background_reindex_search_order(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
@@ -288,12 +284,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                 )
                 conn.set_session(autocommit=False)
 
-            yield self.db_pool.runWithConnection(create_index)
+            await self.db_pool.runWithConnection(create_index)
 
             pg = dict(progress)
             pg["have_added_indexes"] = True
 
-            yield self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
                 self.db_pool.updates._background_update_progress_txn,
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
@@ -331,12 +327,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
 
             return len(rows), True
 
-        num_rows, finished = yield self.db_pool.runInteraction(
+        num_rows, finished = await self.db_pool.runInteraction(
             self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
         )
 
         if not finished:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME
             )
 
@@ -347,8 +343,7 @@ class SearchStore(SearchBackgroundUpdateStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super(SearchStore, self).__init__(database, db_conn, hs)
 
-    @defer.inlineCallbacks
-    def search_msgs(self, room_ids, search_term, keys):
+    async def search_msgs(self, room_ids, search_term, keys):
         """Performs a full text search over events with given keys.
 
         Args:
@@ -425,7 +420,7 @@ class SearchStore(SearchBackgroundUpdateStore):
         # entire table from the database.
         sql += " ORDER BY rank DESC LIMIT 500"
 
-        results = yield self.db_pool.execute(
+        results = await self.db_pool.execute(
             "search_msgs", self.db_pool.cursor_to_dict, sql, *args
         )
 
@@ -433,7 +428,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
         # search results (which is a data leak)
-        events = yield self.get_events_as_list(
+        events = await self.get_events_as_list(
             [r["event_id"] for r in results],
             redact_behaviour=EventRedactBehaviour.BLOCK,
         )
@@ -442,11 +437,11 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         highlights = None
         if isinstance(self.database_engine, PostgresEngine):
-            highlights = yield self._find_highlights_in_postgres(search_query, events)
+            highlights = await self._find_highlights_in_postgres(search_query, events)
 
         count_sql += " GROUP BY room_id"
 
-        count_results = yield self.db_pool.execute(
+        count_results = await self.db_pool.execute(
             "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
         )
 
@@ -462,19 +457,25 @@ class SearchStore(SearchBackgroundUpdateStore):
             "count": count,
         }
 
-    @defer.inlineCallbacks
-    def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
+    async def search_rooms(
+        self,
+        room_ids: List[str],
+        search_term: str,
+        keys: List[str],
+        limit,
+        pagination_token: Optional[str] = None,
+    ) -> List[dict]:
         """Performs a full text search over events with given keys.
 
         Args:
-            room_id (list): The room_ids to search in
-            search_term (str): Search term to search for
-            keys (list): List of keys to search in, currently supports
-                "content.body", "content.name", "content.topic"
-            pagination_token (str): A pagination token previously returned
+            room_ids: The room_ids to search in
+            search_term: Search term to search for
+            keys: List of keys to search in, currently supports "content.body",
+                "content.name", "content.topic"
+            pagination_token: A pagination token previously returned
 
         Returns:
-            list of dicts
+            Each match as a dictionary.
         """
         clauses = []
 
@@ -577,7 +578,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         args.append(limit)
 
-        results = yield self.db_pool.execute(
+        results = await self.db_pool.execute(
             "search_rooms", self.db_pool.cursor_to_dict, sql, *args
         )
 
@@ -585,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
         # search results (which is a data leak)
-        events = yield self.get_events_as_list(
+        events = await self.get_events_as_list(
             [r["event_id"] for r in results],
             redact_behaviour=EventRedactBehaviour.BLOCK,
         )
@@ -594,11 +595,11 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         highlights = None
         if isinstance(self.database_engine, PostgresEngine):
-            highlights = yield self._find_highlights_in_postgres(search_query, events)
+            highlights = await self._find_highlights_in_postgres(search_query, events)
 
         count_sql += " GROUP BY room_id"
 
-        count_results = yield self.db_pool.execute(
+        count_results = await self.db_pool.execute(
             "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
         )