diff --git a/changelog.d/7516.misc b/changelog.d/7516.misc
new file mode 100644
index 0000000000..94b0fd49b2
--- /dev/null
+++ b/changelog.d/7516.misc
@@ -0,0 +1 @@
+Add a worker store for search insertion, required for moving event persistence off master.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 2e3add7ac5..ab801108ca 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -122,6 +122,7 @@ from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState
+from synapse.storage.data_stores.main.search import SearchWorkerStore
from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
@@ -451,6 +452,7 @@ class GenericWorkerSlavedStore(
SlavedFilteringStore,
MonthlyActiveUsersWorkerStore,
MediaRepositoryStore,
+ SearchWorkerStore,
BaseSlavedStore,
):
def __init__(self, database, db_conn, hs):
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index ee75b92344..13f49d8060 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -37,7 +37,55 @@ SearchEntry = namedtuple(
)
-class SearchBackgroundUpdateStore(SQLBaseStore):
+class SearchWorkerStore(SQLBaseStore):
+ def store_search_entries_txn(self, txn, entries):
+ """Add entries to the search table
+
+ Args:
+ txn (cursor):
+ entries (iterable[SearchEntry]):
+ entries to be added to the table
+ """
+ if not self.hs.config.enable_search:
+ return
+ if isinstance(self.database_engine, PostgresEngine):
+ sql = (
+ "INSERT INTO event_search"
+ " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
+ " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
+ )
+
+ args = (
+ (
+ entry.event_id,
+ entry.room_id,
+ entry.key,
+ entry.value,
+ entry.stream_ordering,
+ entry.origin_server_ts,
+ )
+ for entry in entries
+ )
+
+ txn.executemany(sql, args)
+
+ elif isinstance(self.database_engine, Sqlite3Engine):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key, value)"
+ " VALUES (?,?,?,?)"
+ )
+ args = (
+ (entry.event_id, entry.room_id, entry.key, entry.value)
+ for entry in entries
+ )
+
+ txn.executemany(sql, args)
+ else:
+ # This should be unreachable.
+ raise Exception("Unrecognized database engine")
+
+
+class SearchBackgroundUpdateStore(SearchWorkerStore):
EVENT_SEARCH_UPDATE_NAME = "event_search"
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
@@ -296,52 +344,6 @@ class SearchBackgroundUpdateStore(SQLBaseStore):
return num_rows
- def store_search_entries_txn(self, txn, entries):
- """Add entries to the search table
-
- Args:
- txn (cursor):
- entries (iterable[SearchEntry]):
- entries to be added to the table
- """
- if not self.hs.config.enable_search:
- return
- if isinstance(self.database_engine, PostgresEngine):
- sql = (
- "INSERT INTO event_search"
- " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
- " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
- )
-
- args = (
- (
- entry.event_id,
- entry.room_id,
- entry.key,
- entry.value,
- entry.stream_ordering,
- entry.origin_server_ts,
- )
- for entry in entries
- )
-
- txn.executemany(sql, args)
-
- elif isinstance(self.database_engine, Sqlite3Engine):
- sql = (
- "INSERT INTO event_search (event_id, room_id, key, value)"
- " VALUES (?,?,?,?)"
- )
- args = (
- (entry.event_id, entry.room_id, entry.key, entry.value)
- for entry in entries
- )
-
- txn.executemany(sql, args)
- else:
- # This should be unreachable.
- raise Exception("Unrecognized database engine")
-
class SearchStore(SearchBackgroundUpdateStore):
def __init__(self, database: Database, db_conn, hs):
|