summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7516.misc1
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/storage/data_stores/main/search.py96
3 files changed, 52 insertions, 47 deletions
diff --git a/changelog.d/7516.misc b/changelog.d/7516.misc
new file mode 100644
index 0000000000..94b0fd49b2
--- /dev/null
+++ b/changelog.d/7516.misc
@@ -0,0 +1 @@
+Add a worker store for search insertion, required for moving event persistence off master.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 2e3add7ac5..ab801108ca 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -122,6 +122,7 @@ from synapse.storage.data_stores.main.monthly_active_users import (
     MonthlyActiveUsersWorkerStore,
 )
 from synapse.storage.data_stores.main.presence import UserPresenceState
+from synapse.storage.data_stores.main.search import SearchWorkerStore
 from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore
 from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
 from synapse.types import ReadReceipt
@@ -451,6 +452,7 @@ class GenericWorkerSlavedStore(
     SlavedFilteringStore,
     MonthlyActiveUsersWorkerStore,
     MediaRepositoryStore,
+    SearchWorkerStore,
     BaseSlavedStore,
 ):
     def __init__(self, database, db_conn, hs):
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index ee75b92344..13f49d8060 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -37,7 +37,55 @@ SearchEntry = namedtuple(
 )
 
 
-class SearchBackgroundUpdateStore(SQLBaseStore):
+class SearchWorkerStore(SQLBaseStore):
+    def store_search_entries_txn(self, txn, entries):
+        """Add entries to the search table
+
+        Args:
+            txn (cursor):
+            entries (iterable[SearchEntry]):
+                entries to be added to the table
+        """
+        if not self.hs.config.enable_search:
+            return
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "INSERT INTO event_search"
+                " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
+                " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
+            )
+
+            args = (
+                (
+                    entry.event_id,
+                    entry.room_id,
+                    entry.key,
+                    entry.value,
+                    entry.stream_ordering,
+                    entry.origin_server_ts,
+                )
+                for entry in entries
+            )
+
+            txn.executemany(sql, args)
+
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = (
+                "INSERT INTO event_search (event_id, room_id, key, value)"
+                " VALUES (?,?,?,?)"
+            )
+            args = (
+                (entry.event_id, entry.room_id, entry.key, entry.value)
+                for entry in entries
+            )
+
+            txn.executemany(sql, args)
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+
+class SearchBackgroundUpdateStore(SearchWorkerStore):
 
     EVENT_SEARCH_UPDATE_NAME = "event_search"
     EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
@@ -296,52 +344,6 @@ class SearchBackgroundUpdateStore(SQLBaseStore):
 
         return num_rows
 
-    def store_search_entries_txn(self, txn, entries):
-        """Add entries to the search table
-
-        Args:
-            txn (cursor):
-            entries (iterable[SearchEntry]):
-                entries to be added to the table
-        """
-        if not self.hs.config.enable_search:
-            return
-        if isinstance(self.database_engine, PostgresEngine):
-            sql = (
-                "INSERT INTO event_search"
-                " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
-                " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
-            )
-
-            args = (
-                (
-                    entry.event_id,
-                    entry.room_id,
-                    entry.key,
-                    entry.value,
-                    entry.stream_ordering,
-                    entry.origin_server_ts,
-                )
-                for entry in entries
-            )
-
-            txn.executemany(sql, args)
-
-        elif isinstance(self.database_engine, Sqlite3Engine):
-            sql = (
-                "INSERT INTO event_search (event_id, room_id, key, value)"
-                " VALUES (?,?,?,?)"
-            )
-            args = (
-                (entry.event_id, entry.room_id, entry.key, entry.value)
-                for entry in entries
-            )
-
-            txn.executemany(sql, args)
-        else:
-            # This should be unreachable.
-            raise Exception("Unrecognized database engine")
-
 
 class SearchStore(SearchBackgroundUpdateStore):
     def __init__(self, database: Database, db_conn, hs):