diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 2162d0712d..7f8d1880e5 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -16,8 +16,7 @@
import logging
import re
from collections import namedtuple
-
-from twisted.internet import defer
+from typing import List, Optional
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -114,8 +113,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
)
- @defer.inlineCallbacks
- def _background_reindex_search(self, progress, batch_size):
+ async def _background_reindex_search(self, progress, batch_size):
# we work through the events table from highest stream id to lowest
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@@ -206,19 +204,18 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
return len(event_search_rows)
- result = yield self.db_pool.runInteraction(
+ result = await self.db_pool.runInteraction(
self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
)
if not result:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_UPDATE_NAME
)
return result
- @defer.inlineCallbacks
- def _background_reindex_gin_search(self, progress, batch_size):
+ async def _background_reindex_gin_search(self, progress, batch_size):
"""This handles old synapses which used GIST indexes, if any;
converting them back to be GIN as per the actual schema.
"""
@@ -255,15 +252,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
conn.set_session(autocommit=False)
if isinstance(self.database_engine, PostgresEngine):
- yield self.db_pool.runWithConnection(create_index)
+ await self.db_pool.runWithConnection(create_index)
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
)
return 1
- @defer.inlineCallbacks
- def _background_reindex_search_order(self, progress, batch_size):
+ async def _background_reindex_search_order(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
@@ -288,12 +284,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
)
conn.set_session(autocommit=False)
- yield self.db_pool.runWithConnection(create_index)
+ await self.db_pool.runWithConnection(create_index)
pg = dict(progress)
pg["have_added_indexes"] = True
- yield self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
self.EVENT_SEARCH_ORDER_UPDATE_NAME,
self.db_pool.updates._background_update_progress_txn,
self.EVENT_SEARCH_ORDER_UPDATE_NAME,
@@ -331,12 +327,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
return len(rows), True
- num_rows, finished = yield self.db_pool.runInteraction(
+ num_rows, finished = await self.db_pool.runInteraction(
self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
)
if not finished:
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.updates._end_background_update(
self.EVENT_SEARCH_ORDER_UPDATE_NAME
)
@@ -347,8 +343,7 @@ class SearchStore(SearchBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super(SearchStore, self).__init__(database, db_conn, hs)
- @defer.inlineCallbacks
- def search_msgs(self, room_ids, search_term, keys):
+ async def search_msgs(self, room_ids, search_term, keys):
"""Performs a full text search over events with given keys.
Args:
@@ -425,7 +420,7 @@ class SearchStore(SearchBackgroundUpdateStore):
# entire table from the database.
sql += " ORDER BY rank DESC LIMIT 500"
- results = yield self.db_pool.execute(
+ results = await self.db_pool.execute(
"search_msgs", self.db_pool.cursor_to_dict, sql, *args
)
@@ -433,7 +428,7 @@ class SearchStore(SearchBackgroundUpdateStore):
# We set redact_behaviour to BLOCK here to prevent redacted events being returned in
# search results (which is a data leak)
- events = yield self.get_events_as_list(
+ events = await self.get_events_as_list(
[r["event_id"] for r in results],
redact_behaviour=EventRedactBehaviour.BLOCK,
)
@@ -442,11 +437,11 @@ class SearchStore(SearchBackgroundUpdateStore):
highlights = None
if isinstance(self.database_engine, PostgresEngine):
- highlights = yield self._find_highlights_in_postgres(search_query, events)
+ highlights = await self._find_highlights_in_postgres(search_query, events)
count_sql += " GROUP BY room_id"
- count_results = yield self.db_pool.execute(
+ count_results = await self.db_pool.execute(
"search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
)
@@ -462,19 +457,25 @@ class SearchStore(SearchBackgroundUpdateStore):
"count": count,
}
- @defer.inlineCallbacks
- def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
+ async def search_rooms(
+ self,
+ room_ids: List[str],
+ search_term: str,
+ keys: List[str],
+ limit,
+ pagination_token: Optional[str] = None,
+ ) -> List[dict]:
"""Performs a full text search over events with given keys.
Args:
- room_id (list): The room_ids to search in
- search_term (str): Search term to search for
- keys (list): List of keys to search in, currently supports
- "content.body", "content.name", "content.topic"
- pagination_token (str): A pagination token previously returned
+ room_ids: The room_ids to search in
+ search_term: Search term to search for
+ keys: List of keys to search in, currently supports "content.body",
+ "content.name", "content.topic"
+ pagination_token: A pagination token previously returned
Returns:
- list of dicts
+ Each match as a dictionary.
"""
clauses = []
@@ -577,7 +578,7 @@ class SearchStore(SearchBackgroundUpdateStore):
args.append(limit)
- results = yield self.db_pool.execute(
+ results = await self.db_pool.execute(
"search_rooms", self.db_pool.cursor_to_dict, sql, *args
)
@@ -585,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore):
# We set redact_behaviour to BLOCK here to prevent redacted events being returned in
# search results (which is a data leak)
- events = yield self.get_events_as_list(
+ events = await self.get_events_as_list(
[r["event_id"] for r in results],
redact_behaviour=EventRedactBehaviour.BLOCK,
)
@@ -594,11 +595,11 @@ class SearchStore(SearchBackgroundUpdateStore):
highlights = None
if isinstance(self.database_engine, PostgresEngine):
- highlights = yield self._find_highlights_in_postgres(search_query, events)
+ highlights = await self._find_highlights_in_postgres(search_query, events)
count_sql += " GROUP BY room_id"
- count_results = yield self.db_pool.execute(
+ count_results = await self.db_pool.execute(
"search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
)
|