diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/search.py | 89 |
1 files changed, 56 insertions, 33 deletions
diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 190751bade..eecf778516 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from collections import namedtuple import sys from twisted.internet import defer @@ -26,6 +27,11 @@ import ujson as json logger = logging.getLogger(__name__) +SearchEntry = namedtuple('SearchEntry', [ + 'key', 'value', 'event_id', 'room_id', 'stream_ordering', + 'origin_server_ts', +]) + class SearchStore(BackgroundUpdateStore): @@ -49,16 +55,17 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): + # we work through the events table from highest stream id to lowest target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 TYPES = ["m.room.name", "m.room.message", "m.room.topic"] def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id, room_id, type, content FROM events" + "SELECT stream_ordering, event_id, room_id, type, content, " + " origin_server_ts FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -67,6 +74,10 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + # we could stream straight from the results into + # store_search_entries_txn with a generator function, but that + # would mean having two cursors open on the database at once. + # Instead we just build a list of results. rows = self.cursor_to_dict(txn) if not rows: return 0 @@ -79,6 +90,8 @@ class SearchStore(BackgroundUpdateStore): event_id = row["event_id"] room_id = row["room_id"] etype = row["type"] + stream_ordering = row["stream_ordering"] + origin_server_ts = row["origin_server_ts"] try: content = json.loads(row["content"]) except Exception: @@ -93,6 +106,8 @@ class SearchStore(BackgroundUpdateStore): elif etype == "m.room.name": key = "content.name" value = content["name"] + else: + raise Exception("unexpected event type %s" % etype) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. @@ -103,29 +118,16 @@ class SearchStore(BackgroundUpdateStore): # then skip over it continue - event_search_rows.append((event_id, room_id, key, value)) - - if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") - sql = ( - "INSERT INTO event_search (event_id, room_id, key, vector)" - " VALUES (?,?,?,to_tsvector('english', ?))" - ) - elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): - clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) + event_search_rows.append(SearchEntry( + key=key, + value=value, + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + origin_server_ts=origin_server_ts, + )) - if isinstance(self.database_engine, PostgresEngine): - txn.execute("RESET work_mem") + self.store_search_entries_txn(txn, event_search_rows) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -255,6 +257,26 @@ class SearchStore(BackgroundUpdateStore): key (str): value (str): """ + self.store_search_entries_txn( + txn, + (SearchEntry( + key=key, + value=value, + event_id=event.event_id, + room_id=event.room_id, + stream_ordering=event.internal_metadata.stream_ordering, + origin_server_ts=event.origin_server_ts, + ),), + ) + + def store_search_entries_txn(self, txn, entries): + """Add entries to the search table + + Args: + txn (cursor): + entries (iterable[SearchEntry]): + entries to be added to the table + """ if isinstance(self.database_engine, PostgresEngine): sql = ( "INSERT INTO event_search" @@ -262,6 +284,10 @@ class SearchStore(BackgroundUpdateStore): " origin_server_ts)" " VALUES (?,?,?,to_tsvector('english', ?),?,?)" ) + args = (( + entry.event_id, entry.room_id, entry.key, entry.value, + entry.stream_ordering, entry.origin_server_ts, + ) for entry in entries) # inserts to a GIN index are normally batched up into a pending # list, and then all committed together once the list gets to a @@ -277,14 +303,7 @@ class SearchStore(BackgroundUpdateStore): txn.execute("SET work_mem='256kB'") try: - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, - ) - ) + txn.executemany(sql, args) except Exception: # we need to reset work_mem, but doing so may throw a new # exception and we want to preserve the original @@ -306,7 +325,11 @@ class SearchStore(BackgroundUpdateStore): "INSERT INTO event_search (event_id, room_id, key, value)" " VALUES (?,?,?,?)" ) - txn.execute(sql, (event.event_id, event.room_id, key, value,)) + args = (( + entry.event_id, entry.room_id, entry.key, entry.value, + ) for entry in entries) + + txn.executemany(sql, args) else: # This should be unreachable. raise Exception("Unrecognized database engine") |