From 19f9227643b5099666878de33453bbe361f216fc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:25:04 +0000 Subject: avoid 80s GIN inserts by tweaking work_mem see https://github.com/matrix-org/synapse/issues/2753 for details --- synapse/storage/search.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 479b04c636..7b1166f417 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -106,6 +106,7 @@ class SearchStore(BackgroundUpdateStore): event_search_rows.append((event_id, room_id, key, value)) if isinstance(self.database_engine, PostgresEngine): + txn.execute("SET work_mem='256KB'") sql = ( "INSERT INTO event_search (event_id, room_id, key, vector)" " VALUES (?,?,?,to_tsvector('english', ?))" @@ -123,6 +124,9 @@ class SearchStore(BackgroundUpdateStore): clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] txn.executemany(sql, clump) + if isinstance(self.database_engine, PostgresEngine): + txn.execute("RESET work_mem") + progress = { "target_min_stream_id_inclusive": target_min_stream_id, "max_stream_id_exclusive": min_stream_id, -- cgit 1.4.1 From e79db0a673ef79bfa30e435bf64b5c3b75ed98d9 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:37:48 +0000 Subject: switch back from GIST to GIN indexes --- synapse/storage/search.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 479b04c636..ba7141563e 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,7 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" - EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" + EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" def __init__(self, db_conn, hs): super(SearchStore, self).__init__(db_conn, hs) @@ -43,8 +43,8 @@ class SearchStore(BackgroundUpdateStore): self._background_reindex_search_order ) self.register_background_update_handler( - self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, - self._background_reindex_gist_search + self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, + self._background_reindex_gin_search ) @defer.inlineCallbacks @@ -145,25 +145,30 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(result) @defer.inlineCallbacks - def _background_reindex_gist_search(self, progress, batch_size): + def _background_reindex_gin_search(self, progress, batch_size): + '''This handles old synapses which used GIST indexes; converting them + back to be GIN as per the actual schema. Otherwise it crashes out + as a NOOP + ''' + def create_index(conn): conn.rollback() conn.set_session(autocommit=True) c = conn.cursor() c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx_gist" - " ON event_search USING GIST (vector)" + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" ) - c.execute("DROP INDEX event_search_fts_idx") + c.execute("DROP INDEX event_search_fts_idx_gist") conn.set_session(autocommit=False) if isinstance(self.database_engine, PostgresEngine): yield self.runWithConnection(create_index) - yield self._end_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME) + yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME) defer.returnValue(1) @defer.inlineCallbacks -- cgit 1.4.1 From a66f489678dc05fa89e6849405c37a9a390e62fc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Jan 2018 16:55:51 +0000 Subject: fix GIST->GIN switch --- .../storage/schema/delta/38/postgres_fts_gist.sql | 6 +++-- .../storage/schema/delta/46/postgres_fts_gin.sql | 17 +++++++++++++ synapse/storage/search.py | 28 ++++++++++++---------- 3 files changed, 37 insertions(+), 14 deletions(-) create mode 100644 synapse/storage/schema/delta/46/postgres_fts_gin.sql (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/schema/delta/38/postgres_fts_gist.sql b/synapse/storage/schema/delta/38/postgres_fts_gist.sql index f090a7b75a..5fe27d6877 100644 --- a/synapse/storage/schema/delta/38/postgres_fts_gist.sql +++ b/synapse/storage/schema/delta/38/postgres_fts_gist.sql @@ -13,5 +13,7 @@ * limitations under the License. */ - INSERT into background_updates (update_name, progress_json) - VALUES ('event_search_postgres_gist', '{}'); +-- We no longer do this given we back it out again in schema 46 + +-- INSERT into background_updates (update_name, progress_json) +-- VALUES ('event_search_postgres_gist', '{}'); diff --git a/synapse/storage/schema/delta/46/postgres_fts_gin.sql b/synapse/storage/schema/delta/46/postgres_fts_gin.sql new file mode 100644 index 0000000000..31d7a817eb --- /dev/null +++ b/synapse/storage/schema/delta/46/postgres_fts_gin.sql @@ -0,0 +1,17 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_postgres_gin', '{}'); diff --git a/synapse/storage/search.py b/synapse/storage/search.py index ba7141563e..d3e76b58d6 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -146,24 +146,28 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_gin_search(self, progress, batch_size): - '''This handles old synapses which used GIST indexes; converting them - back to be GIN as per the actual schema. Otherwise it crashes out - as a NOOP + '''This handles old synapses which used GIST indexes, if any; + converting them back to be GIN as per the actual schema. ''' def create_index(conn): - conn.rollback() - conn.set_session(autocommit=True) - c = conn.cursor() + try: + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() - c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx" - " ON event_search USING GIN (vector)" - ) + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" + ) - c.execute("DROP INDEX event_search_fts_idx_gist") + c.execute("DROP INDEX event_search_fts_idx_gist") - conn.set_session(autocommit=False) + conn.set_session(autocommit=False) + except e: + logger.warn( + "Ignoring error %s when trying to switch from GIST to GIN" % (e,) + ) if isinstance(self.database_engine, PostgresEngine): yield self.runWithConnection(create_index) -- cgit 1.4.1 From 174eacc8ba71015003a78594ebc89cbe45d8384a Mon Sep 17 00:00:00 2001 From: hera Date: Tue, 9 Jan 2018 18:06:30 +0000 Subject: oops --- synapse/storage/room.py | 2 +- synapse/storage/search.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9e2bf1ab48..0604f8f270 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -310,7 +310,7 @@ class RoomStore(SQLBaseStore): def _store_event_search_txn(self, txn, event, key, value): if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256KB'") + txn.execute("SET work_mem='256kB'") sql = ( "INSERT INTO event_search" " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 7b1166f417..f52f3c8592 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -106,7 +106,7 @@ class SearchStore(BackgroundUpdateStore): event_search_rows.append((event_id, room_id, key, value)) if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256KB'") + txn.execute("SET work_mem='256kB'") sql = ( "INSERT INTO event_search (event_id, room_id, key, vector)" " VALUES (?,?,?,to_tsvector('english', ?))" -- cgit 1.4.1 From 6b02fc80d173d3d4de81623d411a136abe1637e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 2 Feb 2018 14:32:51 +0000 Subject: Reinstate event_search_postgres_gist handler People may have queued updates for this, so we can't just delete it. --- synapse/storage/background_updates.py | 19 +++++++++++++++++++ synapse/storage/registration.py | 7 +------ synapse/storage/search.py | 11 +++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 11a1b942f1..c88759bf2c 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -242,6 +242,25 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_noop_background_update(self, update_name): + """Register a noop handler for a background update. + + This is useful when we previously did a background update, but no + longer wish to do the update. In this case the background update should + be removed from the schema delta files, but there may still be some + users who have the background update queued, so this method should + also be called to clear the update. + + Args: + update_name (str): Name of update + """ + @defer.inlineCallbacks + def noop_update(progress, batch_size): + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, noop_update) + def register_background_index_update(self, update_name, index_name, table, columns, where_clause=None, unique=False, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3aa810981f..95f75d6df1 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,12 +39,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. - @defer.inlineCallbacks - def noop_update(progress, batch_size): - yield self._end_background_update("refresh_tokens_device_index") - defer.returnValue(1) - self.register_background_update_handler( - "refresh_tokens_device_index", noop_update) + self.register_noop_background_update("refresh_tokens_device_index") @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): diff --git a/synapse/storage/search.py b/synapse/storage/search.py index d3e76b58d6..13c827cf87 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -31,6 +31,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" + EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" def __init__(self, db_conn, hs): @@ -42,6 +43,16 @@ class SearchStore(BackgroundUpdateStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) + + # we used to have a background update to turn the GIN index into a + # GIST one; we no longer do that (obviously) because we actually want + # a GIN index. However, it's possible that some people might still have + # the background update queued, so we register a handler to clear the + # background update. + self.register_noop_background_update( + self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME, + ) + self.register_background_update_handler( self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search -- cgit 1.4.1 From 4eeae7ad657729eb8c2765da6fb40fc983c740f7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 22:57:33 +0000 Subject: Move store_event_search_txn to SearchStore ... as a precursor to making event storing and doing the bg update share some code. --- synapse/storage/room.py | 45 ++++++++------------------------------------- synapse/storage/search.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 37 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 9f373b47e0..0fcfb7f86d 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,11 +16,9 @@ from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.storage.search import SearchStore from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from ._base import SQLBaseStore -from .engines import PostgresEngine, Sqlite3Engine - import collections import logging import ujson as json @@ -40,7 +38,7 @@ RatelimitOverride = collections.namedtuple( ) -class RoomStore(SQLBaseStore): +class RoomStore(SearchStore): @defer.inlineCallbacks def store_room(self, room_id, room_creator_user_id, is_public): @@ -263,8 +261,8 @@ class RoomStore(SQLBaseStore): }, ) - self._store_event_search_txn( - txn, event, "content.topic", event.content["topic"] + self.store_event_search_txn( + txn, event, "content.topic", event.content["topic"], ) def _store_room_name_txn(self, txn, event): @@ -279,14 +277,14 @@ class RoomStore(SQLBaseStore): } ) - self._store_event_search_txn( - txn, event, "content.name", event.content["name"] + self.store_event_search_txn( + txn, event, "content.name", event.content["name"], ) def _store_room_message_txn(self, txn, event): if hasattr(event, "content") and "body" in event.content: - self._store_event_search_txn( - txn, event, "content.body", event.content["body"] + self.store_event_search_txn( + txn, event, "content.body", event.content["body"], ) def _store_history_visibility_txn(self, txn, event): @@ -308,33 +306,6 @@ class RoomStore(SQLBaseStore): event.content[key] )) - def _store_event_search_txn(self, txn, event, key, value): - if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") - sql = ( - "INSERT INTO event_search" - " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" - " VALUES (?,?,?,to_tsvector('english', ?),?,?)" - ) - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, - ) - ) - txn.execute("RESET work_mem") - elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - txn.execute(sql, (event.event_id, event.room_id, key, value,)) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - def add_event_report(self, room_id, event_id, user_id, reason, content, received_ts): next_id = self._event_reports_id_gen.get_next() diff --git a/synapse/storage/search.py b/synapse/storage/search.py index f52f3c8592..205e8d0017 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -246,6 +246,41 @@ class SearchStore(BackgroundUpdateStore): defer.returnValue(num_rows) + def store_event_search_txn(self, txn, event, key, value): + """Add event to the search table + + Args: + txn (cursor): + event (EventBase): + key (str): + value (str): + """ + if isinstance(self.database_engine, PostgresEngine): + txn.execute("SET work_mem='256kB'") + sql = ( + "INSERT INTO event_search" + " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " VALUES (?,?,?,to_tsvector('english', ?),?,?)" + ) + txn.execute( + sql, + ( + event.event_id, event.room_id, key, value, + event.internal_metadata.stream_ordering, + event.origin_server_ts, + ) + ) + txn.execute("RESET work_mem") + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + txn.execute(sql, (event.event_id, event.room_id, key, value,)) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. -- cgit 1.4.1 From bd25f9cf36ff86d1616853d88cebd2a4a83fa552 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 23:05:41 +0000 Subject: Clean up work_mem handling Add some comments and improve exception handling when twiddling work_mem for the search update --- synapse/storage/search.py | 52 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 11 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 205e8d0017..190751bade 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import sys from twisted.internet import defer from .background_updates import BackgroundUpdateStore @@ -256,21 +256,51 @@ class SearchStore(BackgroundUpdateStore): value (str): """ if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") sql = ( "INSERT INTO event_search" - " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " (event_id, room_id, key, vector, stream_ordering, " + " origin_server_ts)" " VALUES (?,?,?,to_tsvector('english', ?),?,?)" ) - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, + + # inserts to a GIN index are normally batched up into a pending + # list, and then all committed together once the list gets to a + # certain size. The trouble with that is that postgres (pre-9.5) + # uses work_mem to determine the length of the list, and work_mem + # is typically very large. + # + # We therefore reduce work_mem while we do the insert. + # + # (postgres 9.5 uses the separate gin_pending_list_limit setting, + # so doesn't suffer the same problem, but changing work_mem will + # be harmless) + + txn.execute("SET work_mem='256kB'") + try: + txn.execute( + sql, + ( + event.event_id, event.room_id, key, value, + event.internal_metadata.stream_ordering, + event.origin_server_ts, + ) ) - ) - txn.execute("RESET work_mem") + except Exception: + # we need to reset work_mem, but doing so may throw a new + # exception and we want to preserve the original + t, v, tb = sys.exc_info() + try: + txn.execute("RESET work_mem") + except Exception as e: + logger.warn( + "exception resetting work_mem during exception " + "handling: %r", + e, + ) + raise t, v, tb + else: + txn.execute("RESET work_mem") + elif isinstance(self.database_engine, Sqlite3Engine): sql = ( "INSERT INTO event_search (event_id, room_id, key, value)" -- cgit 1.4.1 From 80b8a28100e29e34bdc6226513575789310aa41f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Sat, 3 Feb 2018 23:07:13 +0000 Subject: Factor out common code for search insert we can reuse the same code as is used for event insert, for doing the background index population. --- synapse/storage/search.py | 89 +++++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 33 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 190751bade..eecf778516 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from collections import namedtuple import sys from twisted.internet import defer @@ -26,6 +27,11 @@ import ujson as json logger = logging.getLogger(__name__) +SearchEntry = namedtuple('SearchEntry', [ + 'key', 'value', 'event_id', 'room_id', 'stream_ordering', + 'origin_server_ts', +]) + class SearchStore(BackgroundUpdateStore): @@ -49,16 +55,17 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_search(self, progress, batch_size): + # we work through the events table from highest stream id to lowest target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 TYPES = ["m.room.name", "m.room.message", "m.room.topic"] def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id, room_id, type, content FROM events" + "SELECT stream_ordering, event_id, room_id, type, content, " + " origin_server_ts FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -67,6 +74,10 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + # we could stream straight from the results into + # store_search_entries_txn with a generator function, but that + # would mean having two cursors open on the database at once. + # Instead we just build a list of results. rows = self.cursor_to_dict(txn) if not rows: return 0 @@ -79,6 +90,8 @@ class SearchStore(BackgroundUpdateStore): event_id = row["event_id"] room_id = row["room_id"] etype = row["type"] + stream_ordering = row["stream_ordering"] + origin_server_ts = row["origin_server_ts"] try: content = json.loads(row["content"]) except Exception: @@ -93,6 +106,8 @@ class SearchStore(BackgroundUpdateStore): elif etype == "m.room.name": key = "content.name" value = content["name"] + else: + raise Exception("unexpected event type %s" % etype) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. @@ -103,29 +118,16 @@ class SearchStore(BackgroundUpdateStore): # then skip over it continue - event_search_rows.append((event_id, room_id, key, value)) - - if isinstance(self.database_engine, PostgresEngine): - txn.execute("SET work_mem='256kB'") - sql = ( - "INSERT INTO event_search (event_id, room_id, key, vector)" - " VALUES (?,?,?,to_tsvector('english', ?))" - ) - elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE): - clump = event_search_rows[index:index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) + event_search_rows.append(SearchEntry( + key=key, + value=value, + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + origin_server_ts=origin_server_ts, + )) - if isinstance(self.database_engine, PostgresEngine): - txn.execute("RESET work_mem") + self.store_search_entries_txn(txn, event_search_rows) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -255,6 +257,26 @@ class SearchStore(BackgroundUpdateStore): key (str): value (str): """ + self.store_search_entries_txn( + txn, + (SearchEntry( + key=key, + value=value, + event_id=event.event_id, + room_id=event.room_id, + stream_ordering=event.internal_metadata.stream_ordering, + origin_server_ts=event.origin_server_ts, + ),), + ) + + def store_search_entries_txn(self, txn, entries): + """Add entries to the search table + + Args: + txn (cursor): + entries (iterable[SearchEntry]): + entries to be added to the table + """ if isinstance(self.database_engine, PostgresEngine): sql = ( "INSERT INTO event_search" @@ -262,6 +284,10 @@ class SearchStore(BackgroundUpdateStore): " origin_server_ts)" " VALUES (?,?,?,to_tsvector('english', ?),?,?)" ) + args = (( + entry.event_id, entry.room_id, entry.key, entry.value, + entry.stream_ordering, entry.origin_server_ts, + ) for entry in entries) # inserts to a GIN index are normally batched up into a pending # list, and then all committed together once the list gets to a @@ -277,14 +303,7 @@ class SearchStore(BackgroundUpdateStore): txn.execute("SET work_mem='256kB'") try: - txn.execute( - sql, - ( - event.event_id, event.room_id, key, value, - event.internal_metadata.stream_ordering, - event.origin_server_ts, - ) - ) + txn.executemany(sql, args) except Exception: # we need to reset work_mem, but doing so may throw a new # exception and we want to preserve the original @@ -306,7 +325,11 @@ class SearchStore(BackgroundUpdateStore): "INSERT INTO event_search (event_id, room_id, key, value)" " VALUES (?,?,?,?)" ) - txn.execute(sql, (event.event_id, event.room_id, key, value,)) + args = (( + entry.event_id, entry.room_id, entry.key, entry.value, + ) for entry in entries) + + txn.executemany(sql, args) else: # This should be unreachable. raise Exception("Unrecognized database engine") -- cgit 1.4.1 From 4a6d5517049c5b8b9e43df43a10a0dda5db07244 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 2 Feb 2018 15:25:27 +0000 Subject: GIN reindex: Fix syntax errors, improve exception handling --- synapse/storage/search.py | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 13c827cf87..076ecff297 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -157,28 +157,42 @@ class SearchStore(BackgroundUpdateStore): @defer.inlineCallbacks def _background_reindex_gin_search(self, progress, batch_size): - '''This handles old synapses which used GIST indexes, if any; + """This handles old synapses which used GIST indexes, if any; converting them back to be GIN as per the actual schema. - ''' + """ def create_index(conn): + conn.rollback() + + # we have to set autocommit, because postgres refuses to + # CREATE INDEX CONCURRENTLY without it. + conn.set_session(autocommit=True) + try: - conn.rollback() - conn.set_session(autocommit=True) c = conn.cursor() + # if we skipped the conversion to GIST, we may already/still + # have an event_search_fts_idx; unfortunately postgres 9.4 + # doesn't support CREATE INDEX IF EXISTS so we just catch the + # exception and ignore it. + import psycopg2 + try: + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" + ) + except psycopg2.ProgrammingError as e: + logger.warn( + "Ignoring error %r when trying to switch from GIST to GIN", + e + ) + + # we should now be able to delete the GIST index. c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx" - " ON event_search USING GIN (vector)" + "DROP INDEX IF EXISTS event_search_fts_idx_gist" ) - - c.execute("DROP INDEX event_search_fts_idx_gist") - + finally: conn.set_session(autocommit=False) - except e: - logger.warn( - "Ignoring error %s when trying to switch from GIST to GIN" % (e,) - ) if isinstance(self.database_engine, PostgresEngine): yield self.runWithConnection(create_index) -- cgit 1.4.1 From 5978dccff09e647509bb92e8125aa02e87f7a0a2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 14 Feb 2018 15:54:09 +0000 Subject: remove overzealous exception handling --- synapse/storage/search.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) (limited to 'synapse/storage/search.py') diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 8d294d497b..2755acff40 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -16,7 +16,6 @@ from collections import namedtuple import logging import re -import sys import ujson as json from twisted.internet import defer @@ -335,25 +334,18 @@ class SearchStore(BackgroundUpdateStore): # (postgres 9.5 uses the separate gin_pending_list_limit setting, # so doesn't suffer the same problem, but changing work_mem will # be harmless) + # + # Note that we don't need to worry about restoring it on + # exception, because exceptions will cause the transaction to be + # rolled back, including the effects of the SET command. + # + # Also: we use SET rather than SET LOCAL because there's lots of + # other stuff going on in this transaction, which want to have the + # normal work_mem setting. txn.execute("SET work_mem='256kB'") - try: - txn.executemany(sql, args) - except Exception: - # we need to reset work_mem, but doing so may throw a new - # exception and we want to preserve the original - t, v, tb = sys.exc_info() - try: - txn.execute("RESET work_mem") - except Exception as e: - logger.warn( - "exception resetting work_mem during exception " - "handling: %r", - e, - ) - raise t, v, tb - else: - txn.execute("RESET work_mem") + txn.executemany(sql, args) + txn.execute("RESET work_mem") elif isinstance(self.database_engine, Sqlite3Engine): sql = ( -- cgit 1.4.1